Skip to content

Commit

Permalink
Merge branch 'main' into ct-2327-model_publication
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank committed Apr 24, 2023
2 parents 719ab9a + d07603b commit 41364be
Showing 12 changed files with 162 additions and 17 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230413-133157.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Persist timing info in run results for failed nodes
time: 2023-04-13T13:31:57.938847-05:00
custom:
Author: stu-k
Issue: "5476"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230420-104254.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Add --target-path to dbt snapshot command.
time: 2023-04-20T10:42:54.17972-04:00
custom:
Author: dwreeves
Issue: "7418"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20230417-122721.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Reduce memory footprint of cached statement results.
time: 2023-04-17T12:27:21.972268-05:00
custom:
Author: iknox-fa
Issue: "7281"
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ __pycache__/
env*/
dbt_env/
build/
!tests/functional/build
!core/dbt/docs/build
develop-eggs/
dist/
1 change: 1 addition & 0 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
@@ -651,6 +651,7 @@ def seed(ctx, **kwargs):
@p.state
@p.deprecated_state
@p.target
@p.target_path
@p.threads
@p.vars
@requires.postflight
24 changes: 21 additions & 3 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
@@ -54,6 +54,7 @@
LoadAgateTableNotSeedError,
LoadAgateTableValueError,
MacroDispatchArgError,
MacroResultAlreadyLoadedError,
MacrosSourcesUnWriteableError,
MetricArgsError,
MissingConfigError,
@@ -710,7 +711,7 @@ def __init__(
self.config: RuntimeConfig
self.model: Union[Macro, ManifestNode] = model
super().__init__(config, manifest, model.package_name)
self.sql_results: Dict[str, AttrDict] = {}
self.sql_results: Dict[str, Optional[AttrDict]] = {}
self.context_config: Optional[ContextConfig] = context_config
self.provider: Provider = provider
self.adapter = get_adapter(self.config)
@@ -738,12 +739,29 @@ def invocation_args_dict(self):
return args_to_dict(self.config.args)

@contextproperty
def _sql_results(self) -> Dict[str, AttrDict]:
def _sql_results(self) -> Dict[str, Optional[AttrDict]]:
return self.sql_results

@contextmember
def load_result(self, name: str) -> Optional[AttrDict]:
return self.sql_results.get(name)
if name in self.sql_results:
# handle the special case of "main" macro
# See: https://github.com/dbt-labs/dbt-core/blob/ada8860e48b32ac712d92e8b0977b2c3c9749981/core/dbt/task/run.py#L228
if name == "main":
return self.sql_results["main"]

# handle a None, which indicates this name was populated but has since been loaded
elif self.sql_results[name] is None:
raise MacroResultAlreadyLoadedError(name)

# Handle the regular use case
else:
ret_val = self.sql_results[name]
self.sql_results[name] = None
return ret_val
else:
# Handle trying to load a result that was never stored
return None

@contextmember
def store_result(
12 changes: 7 additions & 5 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
@@ -21,13 +21,14 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import (
Union,
Any,
Callable,
Dict,
List,
Optional,
Any,
NamedTuple,
Optional,
Sequence,
Union,
)

from dbt.clients.system import write_json
@@ -56,15 +57,16 @@ def to_msg_dict(self):

# This is a context manager
class collect_timing_info:
def __init__(self, name: str):
def __init__(self, name: str, callback: Callable[[TimingInfo], None]):
self.timing_info = TimingInfo(name=name)
self.callback = callback

def __enter__(self):
self.timing_info.begin()
return self.timing_info

def __exit__(self, exc_type, exc_value, traceback):
self.timing_info.end()
self.callback(self.timing_info)
# Note: when legacy logger is removed, we can remove the following line
with TimingProcessor(self.timing_info):
fire_event(
11 changes: 11 additions & 0 deletions core/dbt/exceptions.py
Original file line number Diff line number Diff line change
@@ -1033,6 +1033,17 @@ def get_message(self) -> str:
return msg


class MacroResultAlreadyLoadedError(CompilationError):
def __init__(self, result_name):
self.result_name = result_name
super().__init__(msg=self.get_message())

def get_message(self) -> str:
msg = f"The 'statement' result named '{self.result_name}' has already been loaded into a variable"

return msg


# parser level exceptions
class DictParseError(ParsingError):
def __init__(self, exc: ValidationError, node):
10 changes: 3 additions & 7 deletions core/dbt/task/base.py
Original file line number Diff line number Diff line change
@@ -318,12 +318,11 @@ def compile_and_execute(self, manifest, ctx):
node_info=ctx.node.node_info,
)
)
with collect_timing_info("compile") as timing_info:
with collect_timing_info("compile", ctx.timing.append):
# if we fail here, we still have a compiled node to return
# this has the benefit of showing a build path for the errant
# model
ctx.node = self.compile(manifest)
ctx.timing.append(timing_info)

# for ephemeral nodes, we only want to compile, not run
if not ctx.node.is_ephemeral_model or self.run_ephemeral_models:
@@ -333,12 +332,10 @@ def compile_and_execute(self, manifest, ctx):
node_info=ctx.node.node_info,
)
)
with collect_timing_info("execute") as timing_info:
with collect_timing_info("execute", ctx.timing.append):
result = self.run(ctx.node, manifest)
ctx.node = result.node

ctx.timing.append(timing_info)

return result

def _handle_catchable_exception(self, e, ctx):
@@ -402,8 +399,7 @@ def safe_run(self, manifest):
error = exc_str

if error is not None:
# we could include compile time for runtime errors here
result = self.error_result(ctx.node, error, started, [])
result = self.error_result(ctx.node, error, started, ctx.timing)
elif result is not None:
result = self.from_run_result(result, started, ctx.timing)
else:
32 changes: 32 additions & 0 deletions tests/functional/artifacts/test_run_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import pytest
from dbt.tests.util import run_dbt

good_model_sql = """
select 1 as id
"""

bad_model_sql = """
something bad
"""


class TestRunResultsTimingSuccess:
@pytest.fixture(scope="class")
def models(self):
return {"model.sql": good_model_sql}

def test_timing_exists(self, project):
results = run_dbt(["run"])
assert len(results.results) == 1
assert len(results.results[0].timing) > 0


class TestRunResultsTimingFailure:
@pytest.fixture(scope="class")
def models(self):
return {"model.sql": bad_model_sql}

def test_timing_exists(self, project):
results = run_dbt(["run"], expect_pass=False)
assert len(results.results) == 1
assert len(results.results[0].timing) > 0
44 changes: 44 additions & 0 deletions tests/functional/statements/fixtures.py
Original file line number Diff line number Diff line change
@@ -136,3 +136,47 @@
union all
select 'table' as source, {{ table_value }} as value
"""

models__statement_duplicated_load = """
-- {{ ref('seed') }}
{%- call statement('test_statement', fetch_result=True) -%}
select
count(*) as "num_records"
from {{ ref('seed') }}
{%- endcall -%}
{% set result = load_result('test_statement') %}
{% set result = load_result('test_statement') %}
select 1
"""

models__statement_load_main_twice = """
-- {{ ref('seed') }}
{%- call statement('main', fetch_result=True) -%}
select
count(*) as "num_records"
from {{ ref('seed') }}
{%- endcall -%}
{% set result = load_result('main') %}
{% set result = load_result('main') %}
{% set res_table = result['table'] %}
{% set res_matrix = result['data'] %}
{% set matrix_value = res_matrix[0][0] %}
{% set table_value = res_table[0]['num_records'] %}
select 'matrix' as source, {{ matrix_value }} as value
union all
select 'table' as source, {{ table_value }} as value
"""
26 changes: 24 additions & 2 deletions tests/functional/statements/test_statements.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,8 @@
from dbt.tests.util import run_dbt, check_relations_equal, write_file
from tests.functional.statements.fixtures import (
models__statement_actual,
models__statement_duplicated_load,
models__statement_load_main_twice,
seeds__statement_actual,
seeds__statement_expected,
)
@@ -21,7 +23,11 @@ def setUp(self, project):

@pytest.fixture(scope="class")
def models(self):
return {"statement_actual.sql": models__statement_actual}
return {
"statement_actual.sql": models__statement_actual,
"statement_duplicated_load.sql": models__statement_duplicated_load,
"statement_load_main_twice.sql": models__statement_load_main_twice,
}

@pytest.fixture(scope="class")
def project_config_update(self):
@@ -35,7 +41,23 @@ def project_config_update(self):
def test_postgres_statements(self, project):
results = run_dbt(["seed"])
assert len(results) == 2
results = run_dbt()
results = run_dbt(["run", "-m", "statement_actual"])
assert len(results) == 1

check_relations_equal(project.adapter, ["statement_actual", "statement_expected"])

def test_duplicated_load_statements(self, project):
run_dbt(["seed"])
results = run_dbt(["run", "-m", "statement_duplicated_load"], False)
assert len(results) == 1
assert results.results[0].status == "error"
assert (
"The 'statement' result named 'test_statement' has already been loaded into a variable"
in results.results[0].message
)

def test_load_statement_on_main_twice(self, project):
run_dbt(["seed"])
results = run_dbt(["run", "-m", "statement_load_main_twice"])
assert len(results) == 1
check_relations_equal(project.adapter, ["statement_load_main_twice", "statement_expected"])

0 comments on commit 41364be

Please sign in to comment.