diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 4888583bb..fdf1d7d39 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -42,6 +42,7 @@ ) from cosmos.dbt.parser.output import ( extract_log_issues, + parse_output, ) logger = get_logger(__name__) @@ -350,11 +351,12 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope job_facets=job_facets, ) - def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None: + def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> FullOutputSubprocessResult: dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags) dbt_cmd = dbt_cmd or [] result = self.run_command(cmd=dbt_cmd, env=env, context=context) logger.info(result.output) + return result def execute(self, context: Context) -> None: self.build_and_run_cmd(context=context) @@ -492,6 +494,16 @@ def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context) if self.on_warning_callback: self.on_warning_callback(warning_context) + def execute(self, context: Context) -> None: + result = self.build_and_run_cmd(context=context) + + if not self._should_run_tests(result): + return + + warnings = parse_output(result, "WARN") + if warnings > 0: + self._handle_warnings(result, context) + class DbtRunOperationLocalOperator(DbtLocalBaseOperator): """ diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 94b0f8e27..fdd77aec3 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -302,3 +302,35 @@ def test_operator_execute_without_flags(mock_build_and_run_cmd, operator_class): ) task.execute(context={}) mock_build_and_run_cmd.assert_called_once_with(context={}) + + +@patch("cosmos.operators.local.parse_output") +@patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd") +def test_test_operator_execute_with_on_warning_callback(mock_build_and_run_cmd, mock_parse_output): + # simulate when there is warning + mock_parse_output.return_value = 1 + + warning_handler = MagicMock() + + test_operator = DbtTestLocalOperator( + profile_config=profile_config, project_dir="my/dir", task_id="test", on_warning_callback=warning_handler + ) + test_operator.execute(context={}) + mock_build_and_run_cmd.assert_called_once() + warning_handler.assert_called_once() + + +@patch("cosmos.operators.local.parse_output") +@patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd") +def test_test_operator_execute_without_on_warning_callback(mock_build_and_run_cmd, mock_parse_output): + # simulate when there is no warning + mock_parse_output.return_value = 0 + + warning_handler = MagicMock() + + test_operator = DbtTestLocalOperator( + profile_config=profile_config, project_dir="my/dir", task_id="test", on_warning_callback=warning_handler + ) + test_operator.execute(context={}) + mock_build_and_run_cmd.assert_called_once() + warning_handler.assert_not_called()