Skip to content

Commit

Permalink
some cleanup, some bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
emmyoop committed May 10, 2024
1 parent c3e95c2 commit 4ecef69
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 66 deletions.
2 changes: 1 addition & 1 deletion core/dbt/artifacts/schemas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def inner(cls: Type[VersionedSchema]):
return inner


# This is used in the ArtifactMixin and RemoteResult classes
# This is used in the ArtifactMixin and RemoteCompileResultMixin classes
@dataclasses.dataclass
class VersionedSchema(dbtClassMixin):
dbt_schema_version: ClassVar[SchemaVersion]
Expand Down
3 changes: 1 addition & 2 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1071,9 +1071,8 @@ def load_macros(
# and then throws it away, returning only the
# manifest
loader = cls(root_config, projects, macro_hook)
macro_manifest = loader.create_macro_manifest()

return macro_manifest
return loader.create_macro_manifest()

# Create tracking event for saving performance info
def track_project_load(self):
Expand Down
44 changes: 22 additions & 22 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,29 +368,29 @@ def run_hooks(self, adapter, hook_type: RunHookType, extra_context) -> None:
)
)

with Timer() as timer:
if len(sql.strip()) > 0:
response, _ = adapter.execute(sql, auto_begin=False, fetch=False)
status = response._message
else:
status = "OK"

self.ran_hooks.append(hook)
hook.update_event_status(finished_at=datetime.utcnow().isoformat())
hook.update_event_status(node_status=RunStatus.Success)
fire_event(
LogHookEndLine(
statement=hook_text,
status=status,
index=idx,
total=num_hooks,
execution_time=timer.elapsed,
node_info=hook.node_info,
with Timer() as timer:
if len(sql.strip()) > 0:
response, _ = adapter.execute(sql, auto_begin=False, fetch=False)
status = response._message
else:
status = "OK"

self.ran_hooks.append(hook)
hook.update_event_status(finished_at=datetime.utcnow().isoformat())
hook.update_event_status(node_status=RunStatus.Success)
fire_event(
LogHookEndLine(
statement=hook_text,
status=status,
index=idx,
total=num_hooks,
execution_time=timer.elapsed,
node_info=hook.node_info,
)
)
)
# `_event_status` dict is only used for logging. Make sure
# it gets deleted when we're done with it
hook.clear_event_status()
# `_event_status` dict is only used for logging. Make sure
# it gets deleted when we're done with it
hook.clear_event_status()

self._total_executed += len(ordered_hooks)

Expand Down
83 changes: 42 additions & 41 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from dbt.parser.manifest import write_manifest
from dbt.task.base import BaseRunner, ConfiguredTask
from dbt_common.context import _INVOCATION_CONTEXT_VAR, get_invocation_context
from dbt_common.events.contextvars import task_contextvars
from dbt_common.events.contextvars import log_contextvars, task_contextvars
from dbt_common.events.functions import fire_event, warn_or_error
from dbt_common.events.types import Formatting
from dbt_common.exceptions import NotImplementedError
Expand Down Expand Up @@ -188,51 +188,52 @@ def get_runner(self, node) -> BaseRunner:
return cls(self.config, adapter, node, run_count, num_nodes)

def call_runner(self, runner: BaseRunner) -> RunResult:
runner.node.update_event_status(
started_at=datetime.utcnow().isoformat(), node_status=RunningStatus.Started
)
fire_event(
NodeStart(
node_info=runner.node.node_info,
with log_contextvars(node_info=runner.node.node_info):
runner.node.update_event_status(
started_at=datetime.utcnow().isoformat(), node_status=RunningStatus.Started
)
)
try:
result = runner.run_with_hooks(self.manifest)
except Exception as e:
thread_exception = e
finally:
if result is not None:
fire_event(
NodeFinished(
node_info=runner.node.node_info,
run_result=result.to_msg_dict(),
)
fire_event(
NodeStart(
node_info=runner.node.node_info,
)
else:
msg = f"Exception on worker thread. {thread_exception}"

fire_event(
GenericExceptionOnRun(
unique_id=runner.node.unique_id,
exc=str(thread_exception),
node_info=runner.node.node_info,
)
try:
result = runner.run_with_hooks(self.manifest)
except Exception as e:
thread_exception = e
finally:
if result is not None:
fire_event(
NodeFinished(
node_info=runner.node.node_info,
run_result=result.to_msg_dict(),
)
)
else:
msg = f"Exception on worker thread. {thread_exception}"

Check warning on line 213 in core/dbt/task/runnable.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/runnable.py#L213

Added line #L213 was not covered by tests

fire_event(

Check warning on line 215 in core/dbt/task/runnable.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/runnable.py#L215

Added line #L215 was not covered by tests
GenericExceptionOnRun(
unique_id=runner.node.unique_id,
exc=str(thread_exception),
node_info=runner.node.node_info,
)
)
)

result = RunResult(
status=RunStatus.Error, # type: ignore
timing=[],
thread_id="",
execution_time=0.0,
adapter_response={},
message=msg,
failures=None,
node=runner.node,
)
result = RunResult(

Check warning on line 223 in core/dbt/task/runnable.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/runnable.py#L223

Added line #L223 was not covered by tests
status=RunStatus.Error, # type: ignore
timing=[],
thread_id="",
execution_time=0.0,
adapter_response={},
message=msg,
failures=None,
node=runner.node,
)

# `_event_status` dict is only used for logging. Make sure
# it gets deleted when we're done with it
runner.node.clear_event_status()
# `_event_status` dict is only used for logging. Make sure
# it gets deleted when we're done with it
runner.node.clear_event_status()

fail_fast = get_flags().FAIL_FAST

Expand Down

0 comments on commit 4ecef69

Please sign in to comment.