diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 69bc1a78c..adf608643 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -405,10 +405,21 @@ class DbtRunLocalOperator(DbtLocalBaseOperator): ui_color = "#7352BA" ui_fgcolor = "#F4F2FC" - def __init__(self, **kwargs: Any) -> None: + def __init__(self, full_refresh: bool = False, **kwargs: Any) -> None: + self.full_refresh = full_refresh super().__init__(**kwargs) self.base_cmd = ["run"] + def add_cmd_flags(self) -> list[str]: + flags = [] + if self.full_refresh is True: + flags.append("--full-refresh") + return flags + + def execute(self, context: Context) -> None: + cmd_flags = self.add_cmd_flags() + self.build_and_run_cmd(context=context, cmd_flags=cmd_flags) + class DbtTestLocalOperator(DbtLocalBaseOperator): """ @@ -485,6 +496,10 @@ def add_cmd_flags(self) -> list[str]: flags.append(yaml.dump(self.args)) return flags + def execute(self, context: Context) -> None: + cmd_flags = self.add_cmd_flags() + self.build_and_run_cmd(context=context, cmd_flags=cmd_flags) + class DbtDocsLocalOperator(DbtLocalBaseOperator): """ diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 7d00d88c8..f926816f6 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -11,7 +11,18 @@ from pendulum import datetime from cosmos.config import ProfileConfig -from cosmos.operators.local import DbtLocalBaseOperator, DbtRunLocalOperator, DbtTestLocalOperator +from cosmos.operators.local import ( + DbtLocalBaseOperator, + DbtLSLocalOperator, + DbtSnapshotLocalOperator, + DbtRunLocalOperator, + DbtTestLocalOperator, + DbtDocsLocalOperator, + DbtDocsS3LocalOperator, + DbtDocsAzureStorageLocalOperator, + DbtSeedLocalOperator, + DbtRunOperationLocalOperator, +) from cosmos.profiles import PostgresUserPasswordProfileMapping from tests.utils import test_dag as run_test_dag @@ -228,3 +239,49 @@ def test_store_compiled_sql() -> None: tmp_project_dir="my/dir", context=Context(execution_date=datetime(2023, 2, 15, 12, 30)), ) + + +@pytest.mark.parametrize( + "operator_class,kwargs,expected_call_kwargs", + [ + (DbtSeedLocalOperator, {"full_refresh": True}, {"context": {}, "cmd_flags": ["--full-refresh"]}), + (DbtRunLocalOperator, {"full_refresh": True}, {"context": {}, "cmd_flags": ["--full-refresh"]}), + ( + DbtRunOperationLocalOperator, + {"args": {"days": 7, "dry_run": True}, "macro_name": "bla"}, + {"context": {}, "cmd_flags": ["--args", "days: 7\ndry_run: true\n"]}, + ), + ], +) +@patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd") +def test_operator_execute_with_flags(mock_build_and_run_cmd, operator_class, kwargs, expected_call_kwargs): + task = operator_class(profile_config=profile_config, task_id="my-task", project_dir="my/dir", **kwargs) + task.execute(context={}) + mock_build_and_run_cmd.assert_called_once_with(**expected_call_kwargs) + + +@pytest.mark.parametrize( + "operator_class", + ( + DbtLSLocalOperator, + DbtSnapshotLocalOperator, + DbtTestLocalOperator, + DbtDocsLocalOperator, + DbtDocsS3LocalOperator, + DbtDocsAzureStorageLocalOperator, + ), +) +@patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd") +def test_operator_execute_without_flags(mock_build_and_run_cmd, operator_class): + operator_class_kwargs = { + DbtDocsS3LocalOperator: {"aws_conn_id": "fake-conn", "bucket_name": "fake-bucket"}, + DbtDocsAzureStorageLocalOperator: {"azure_conn_id": "fake-conn", "container_name": "fake-container"}, + } + task = operator_class( + profile_config=profile_config, + task_id="my-task", + project_dir="my/dir", + **operator_class_kwargs.get(operator_class, {}), + ) + task.execute(context={}) + mock_build_and_run_cmd.assert_called_once_with(context={})