From e779a796f7f0db3965dfd8eeb5e01bd9eec9b4ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Mon, 5 Sep 2022 16:16:08 +0200 Subject: [PATCH 1/4] feature: add several missing dbt commands According to the reference that can be found here https://docs.getdbt.com/reference/dbt-commands --- airflow_dbt/operators/dbt_operator.py | 82 +++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/airflow_dbt/operators/dbt_operator.py b/airflow_dbt/operators/dbt_operator.py index 6233d8d..0be1f08 100644 --- a/airflow_dbt/operators/dbt_operator.py +++ b/airflow_dbt/operators/dbt_operator.py @@ -142,3 +142,85 @@ def __init__(self, profiles_dir=None, target=None, *args, **kwargs): def execute(self, context): self.create_hook().run_cli('deps') + + +class DbtBuildOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('build') + + +class DbtCleanOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('clean') + + +class DbtCompileOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('compile') + + +class DbtDebugOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('debug') + + +class DbtInitOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('init') + + +class DbtListOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('list') + + +class DbtParseOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('parse') + + +class DbtSourceOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('source') + + +class DbtRunOperationOperator(DbtBaseOperator): + @apply_defaults + def __init__(self, profiles_dir=None, target=None, *args, **kwargs): + super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + + def execute(self, context): + self.create_hook().run_cli('run-operation') + From 448cdade0edbb857fa42babde35ce34a275f2a70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Mon, 5 Sep 2022 17:36:30 +0200 Subject: [PATCH 2/4] test: parametrize Operators tests --- tests/operators/test_dbt_operator.py | 106 +++++++++++++-------------- 1 file changed, 52 insertions(+), 54 deletions(-) diff --git a/tests/operators/test_dbt_operator.py b/tests/operators/test_dbt_operator.py index 8ce2c5f..b30a404 100644 --- a/tests/operators/test_dbt_operator.py +++ b/tests/operators/test_dbt_operator.py @@ -1,66 +1,64 @@ import datetime -from unittest import TestCase, mock +from unittest import mock +from unittest.mock import MagicMock + +import pytest from airflow import DAG, configuration -from airflow_dbt.hooks.dbt_hook import DbtCliHook + +from airflow_dbt import DbtCliHook from airflow_dbt.operators.dbt_operator import ( + DbtBuildOperator, DbtCleanOperator, DbtCompileOperator, DbtDebugOperator, DbtDepsOperator, DbtDocsGenerateOperator, + DbtInitOperator, DbtListOperator, DbtParseOperator, DbtRunOperator, DbtSeedOperator, DbtSnapshotOperator, - DbtRunOperator, - DbtTestOperator, - DbtDepsOperator + DbtSourceOperator, DbtTestOperator, ) -class TestDbtOperator(TestCase): - def setUp(self): - configuration.conf.load_test_config() - args = { - 'owner': 'airflow', - 'start_date': datetime.datetime(2020, 2, 27) - } - self.dag = DAG('test_dag_id', default_args=args) +@pytest.fixture +def spy_cli_run(mocker) -> MagicMock: + yield mocker.patch("airflow_dbt.hooks.dbt_hook.DbtCliHook.run_cli") - @mock.patch.object(DbtCliHook, 'run_cli') - def test_dbt_run(self, mock_run_cli): - operator = DbtRunOperator( - task_id='run', - dag=self.dag - ) - operator.execute(None) - mock_run_cli.assert_called_once_with('run') - @mock.patch.object(DbtCliHook, 'run_cli') - def test_dbt_test(self, mock_run_cli): - operator = DbtTestOperator( - task_id='test', - dag=self.dag - ) - operator.execute(None) - mock_run_cli.assert_called_once_with('test') +@pytest.fixture +def mock_dag() -> MagicMock: + configuration.conf.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': datetime.datetime(2020, 2, 27) + } + yield DAG('test_dag_id', default_args=args) - @mock.patch.object(DbtCliHook, 'run_cli') - def test_dbt_snapshot(self, mock_run_cli): - operator = DbtSnapshotOperator( - task_id='snapshot', - dag=self.dag - ) - operator.execute(None) - mock_run_cli.assert_called_once_with('snapshot') - @mock.patch.object(DbtCliHook, 'run_cli') - def test_dbt_seed(self, mock_run_cli): - operator = DbtSeedOperator( - task_id='seed', - dag=self.dag - ) - operator.execute(None) - mock_run_cli.assert_called_once_with('seed') - - @mock.patch.object(DbtCliHook, 'run_cli') - def test_dbt_deps(self, mock_run_cli): - operator = DbtDepsOperator( - task_id='deps', - dag=self.dag - ) - operator.execute(None) - mock_run_cli.assert_called_once_with('deps') +@pytest.mark.parametrize( + ['operator', 'expected_command'], + [ + (DbtRunOperator, ['run']), + (DbtTestOperator, ['test']), + (DbtSnapshotOperator, ['snapshot']), + (DbtDocsGenerateOperator, ['docs', 'generate']), + (DbtSeedOperator, ['seed']), + (DbtDepsOperator, ['deps']), + (DbtBuildOperator, ['build']), + (DbtCleanOperator, ['clean']), + (DbtCompileOperator, ['compile']), + (DbtDebugOperator, ['debug']), + (DbtInitOperator, ['init']), + (DbtListOperator, ['list']), + (DbtParseOperator, ['parse']), + (DbtListOperator, ['list']), + (DbtSourceOperator, ['source']), + ] +) +@mock.patch.object(DbtCliHook, 'run_cli') +def test_operators_commands( + spy_cli_run, + operator: DbtRunOperator, + expected_command: [str], + mock_dag, +): + """Every operator passess down to the execution the correct dbt command""" + task_id = 'test_dbt_' + '_'.join(expected_command) + operator = operator(task_id=task_id, dag=mock_dag) + operator.execute(None) + spy_cli_run.assert_called_once_with(*expected_command) From 4f7535bcea861fab63acabd321301db317a33202 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Mon, 5 Sep 2022 17:36:59 +0200 Subject: [PATCH 3/4] fix: operators call now supper with it's own type --- airflow_dbt/operators/dbt_operator.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/airflow_dbt/operators/dbt_operator.py b/airflow_dbt/operators/dbt_operator.py index 0be1f08..e554b27 100644 --- a/airflow_dbt/operators/dbt_operator.py +++ b/airflow_dbt/operators/dbt_operator.py @@ -147,7 +147,7 @@ def execute(self, context): class DbtBuildOperator(DbtBaseOperator): @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): - super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + super(DbtBuildOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) def execute(self, context): self.create_hook().run_cli('build') @@ -156,7 +156,7 @@ def execute(self, context): class DbtCleanOperator(DbtBaseOperator): @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): - super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + super(DbtCleanOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) def execute(self, context): self.create_hook().run_cli('clean') @@ -165,7 +165,7 @@ def execute(self, context): class DbtCompileOperator(DbtBaseOperator): @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): - super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + super(DbtCompileOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) def execute(self, context): self.create_hook().run_cli('compile') @@ -174,7 +174,7 @@ def execute(self, context): class DbtDebugOperator(DbtBaseOperator): @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): - super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + super(DbtDebugOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) def execute(self, context): self.create_hook().run_cli('debug') @@ -183,7 +183,7 @@ def execute(self, context): class DbtInitOperator(DbtBaseOperator): @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): - super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + super(DbtInitOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) def execute(self, context): self.create_hook().run_cli('init') @@ -192,7 +192,7 @@ def execute(self, context): class DbtListOperator(DbtBaseOperator): @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): - super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + super(DbtListOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) def execute(self, context): self.create_hook().run_cli('list') @@ -201,7 +201,7 @@ def execute(self, context): class DbtParseOperator(DbtBaseOperator): @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): - super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + super(DbtParseOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) def execute(self, context): self.create_hook().run_cli('parse') @@ -210,7 +210,7 @@ def execute(self, context): class DbtSourceOperator(DbtBaseOperator): @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): - super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + super(DbtSourceOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) def execute(self, context): self.create_hook().run_cli('source') @@ -219,7 +219,7 @@ def execute(self, context): class DbtRunOperationOperator(DbtBaseOperator): @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): - super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) + super(DbtRunOperationOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) def execute(self, context): self.create_hook().run_cli('run-operation') From 50de1f4fc91a2d07c0f12c9fefc381bb70ef0902 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Mon, 5 Sep 2022 17:42:42 +0200 Subject: [PATCH 4/4] test: fix the type for a test param --- tests/operators/test_dbt_operator.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/operators/test_dbt_operator.py b/tests/operators/test_dbt_operator.py index b30a404..dcb12fd 100644 --- a/tests/operators/test_dbt_operator.py +++ b/tests/operators/test_dbt_operator.py @@ -7,7 +7,8 @@ from airflow_dbt import DbtCliHook from airflow_dbt.operators.dbt_operator import ( - DbtBuildOperator, DbtCleanOperator, DbtCompileOperator, DbtDebugOperator, DbtDepsOperator, DbtDocsGenerateOperator, + DbtBaseOperator, DbtBuildOperator, DbtCleanOperator, DbtCompileOperator, DbtDebugOperator, DbtDepsOperator, + DbtDocsGenerateOperator, DbtInitOperator, DbtListOperator, DbtParseOperator, DbtRunOperator, DbtSeedOperator, DbtSnapshotOperator, @@ -53,7 +54,7 @@ def mock_dag() -> MagicMock: @mock.patch.object(DbtCliHook, 'run_cli') def test_operators_commands( spy_cli_run, - operator: DbtRunOperator, + operator: DbtBaseOperator, expected_command: [str], mock_dag, ):