diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 4c7e706780d..a10abd565f7 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -532,6 +532,9 @@ def safe_run_hooks( started_at = datetime.utcnow() ordered_hooks = self.get_hooks_by_type(hook_type) + if hook_type == RunHookType.End and ordered_hooks: + fire_event(Formatting("")) + # on-run-* hooks should run outside a transaction. This happens because psycopg2 automatically begins a transaction when a connection is created. adapter.clear_transaction() if not ordered_hooks: @@ -541,8 +544,6 @@ def safe_run_hooks( failed = False num_hooks = len(ordered_hooks) - fire_event(Formatting("")) - for idx, hook in enumerate(ordered_hooks, 1): with log_contextvars(node_info=hook.node_info): hook.index = idx @@ -605,6 +606,9 @@ def safe_run_hooks( ) ) + if hook_type == RunHookType.Start and ordered_hooks: + fire_event(Formatting("")) + return status def print_results_line(self, results, execution_time) -> None: diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index cc562c490f2..be926e7275c 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -495,6 +495,7 @@ def print_results_line(self, node_results, elapsed): def execute_with_hooks(self, selected_uids: AbstractSet[str]): adapter = get_adapter(self.config) + fire_event(Formatting("")) fire_event( ConcurrencyLine( num_threads=self.config.threads, @@ -502,11 +503,11 @@ def execute_with_hooks(self, selected_uids: AbstractSet[str]): node_count=self.num_nodes, ) ) + fire_event(Formatting("")) self.started_at = time.time() try: before_run_status = self.before_run(adapter, selected_uids) - fire_event(Formatting("")) if before_run_status == RunStatus.Success or ( not get_flags().skip_nodes_if_on_run_start_fails @@ -531,9 +532,9 @@ def execute_with_hooks(self, selected_uids: AbstractSet[str]): node_info=node.node_info, ) ) - skipped_node = mark_node_as_skipped(node, executed_node_ids, None) - if skipped_node: - self.node_results.append(skipped_node) + skipped_node_result = mark_node_as_skipped(node, executed_node_ids, None) + if skipped_node_result: + self.node_results.append(skipped_node_result) self.after_run(adapter, res) finally: @@ -561,7 +562,6 @@ def run(self): ) if len(self._flattened_nodes) == 0: - fire_event(Formatting("")) warn_or_error(NothingToDo()) result = self.get_result( results=[], @@ -569,7 +569,6 @@ def run(self): elapsed_time=0.0, ) else: - fire_event(Formatting("")) selected_uids = frozenset(n.unique_id for n in self._flattened_nodes) result = self.execute_with_hooks(selected_uids)