Skip to content

Commit

Permalink
CT 2057 Fix compilation logic for ephemeral nodes (#7023) (#7037)
Browse files Browse the repository at this point in the history
* Don't overwrite sql in extra_ctes when compiling (rendering) nodes

(cherry picked from commit 70c26f5)

Co-authored-by: Gerda Shank <[email protected]>
Co-authored-by: Jeremy Cohen <[email protected]>
  • Loading branch information
3 people authored Mar 1, 2023
1 parent 57a6963 commit 0c06735
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 97 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230221-170630.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix compilation logic for ephemeral nodes
time: 2023-02-21T17:06:30.218568-05:00
custom:
Author: gshank
Issue: "6885"
73 changes: 34 additions & 39 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ def _generate_stats(manifest: Manifest):

def _add_prepended_cte(prepended_ctes, new_cte):
for cte in prepended_ctes:
if cte.id == new_cte.id:
if cte.id == new_cte.id and new_cte.sql:
cte.sql = new_cte.sql
return
prepended_ctes.append(new_cte)
if new_cte.sql:
prepended_ctes.append(new_cte)


def _extend_prepended_ctes(prepended_ctes, new_prepended_ctes):
Expand Down Expand Up @@ -257,16 +258,18 @@ def _recursively_prepend_ctes(
inserting CTEs into the SQL.
"""
if model.compiled_code is None:
raise DbtRuntimeError("Cannot inject ctes into an unparsed node", model)
raise DbtRuntimeError("Cannot inject ctes into an uncompiled node", model)

# extra_ctes_injected flag says that we've already recursively injected the ctes
if model.extra_ctes_injected:
return (model, model.extra_ctes)

# Just to make it plain that nothing is actually injected for this case
if not model.extra_ctes:
if len(model.extra_ctes) == 0:
# SeedNodes don't have compilation attributes
if not isinstance(model, SeedNode):
model.extra_ctes_injected = True
manifest.update_node(model)
return (model, model.extra_ctes)
return (model, [])

# This stores the ctes which will all be recursively
# gathered and then "injected" into the model.
Expand All @@ -275,7 +278,8 @@ def _recursively_prepend_ctes(
# extra_ctes are added to the model by
# RuntimeRefResolver.create_relation, which adds an
# extra_cte for every model relation which is an
# ephemeral model.
# ephemeral model. InjectedCTEs have a unique_id and sql.
# extra_ctes start out with sql set to None, and the sql is set in this loop.
for cte in model.extra_ctes:
if cte.id not in manifest.nodes:
raise DbtInternalError(
Expand All @@ -288,23 +292,23 @@ def _recursively_prepend_ctes(
if not cte_model.is_ephemeral_model:
raise DbtInternalError(f"{cte.id} is not ephemeral")

# This model has already been compiled, so it's been
# through here before
if getattr(cte_model, "compiled", False):
# This model has already been compiled and extra_ctes_injected, so it's been
# through here before. We already checked above for extra_ctes_injected, but
# checking again because updates maybe have happened in another thread.
if cte_model.compiled is True and cte_model.extra_ctes_injected is True:
new_prepended_ctes = cte_model.extra_ctes

# if the cte_model isn't compiled, i.e. first time here
else:
# This is an ephemeral parsed model that we can compile.
# Compile and update the node
cte_model = self._compile_node(cte_model, manifest, extra_context)
# recursively call this method
# Render the raw_code and set compiled to True
cte_model = self._compile_code(cte_model, manifest, extra_context)
# recursively call this method, sets extra_ctes_injected to True
cte_model, new_prepended_ctes = self._recursively_prepend_ctes(
cte_model, manifest, extra_context
)
# Save compiled SQL file and sync manifest
# Write compiled SQL file
self._write_node(cte_model)
manifest.sync_update_node(cte_model)

_extend_prepended_ctes(prepended_ctes, new_prepended_ctes)

Expand All @@ -318,20 +322,21 @@ def _recursively_prepend_ctes(
model.compiled_code,
prepended_ctes,
)
model._pre_injected_sql = model.compiled_code
model.compiled_code = injected_sql
model.extra_ctes_injected = True
model.extra_ctes = prepended_ctes
model.validate(model.to_dict(omit_none=True))
manifest.update_node(model)
# Check again before updating for multi-threading
if not model.extra_ctes_injected:
model._pre_injected_sql = model.compiled_code
model.compiled_code = injected_sql
model.extra_ctes = prepended_ctes
model.extra_ctes_injected = True

return model, prepended_ctes
# if model.extra_ctes is not set to prepended ctes, something went wrong
return model, model.extra_ctes

# Sets compiled fields in the ManifestSQLNode passed in,
# Sets compiled_code and compiled flag in the ManifestSQLNode passed in,
# creates a "context" dictionary for jinja rendering,
# and then renders the "compiled_code" using the node, the
# raw_code and the context.
def _compile_node(
def _compile_code(
self,
node: ManifestSQLNode,
manifest: Manifest,
Expand All @@ -340,16 +345,6 @@ def _compile_node(
if extra_context is None:
extra_context = {}

data = node.to_dict(omit_none=True)
data.update(
{
"compiled": False,
"compiled_code": None,
"extra_ctes_injected": False,
"extra_ctes": [],
}
)

if node.language == ModelLanguage.python:
# TODO could we also 'minify' this code at all? just aesthetic, not functional

Expand Down Expand Up @@ -378,6 +373,8 @@ def _compile_node(
node,
)

node.compiled = True

# relation_name is set at parse time, except for tests without store_failures,
# but cli param can turn on store_failures, so we set here.
if (
Expand All @@ -390,8 +387,6 @@ def _compile_node(
relation_name = str(relation_cls.create_from(self.config, node))
node.relation_name = relation_name

node.compiled = True

return node

def write_graph_file(self, linker: Linker, manifest: Manifest):
Expand Down Expand Up @@ -522,11 +517,11 @@ def compile_node(
) -> ManifestSQLNode:
"""This is the main entry point into this code. It's called by
CompileRunner.compile, GenericRPCRunner.compile, and
RunTask.get_hook_sql. It calls '_compile_node' to convert
the node into a compiled node, and then calls the
RunTask.get_hook_sql. It calls '_compile_code' to render
the node's raw_code into compiled_code, and then calls the
recursive method to "prepend" the ctes.
"""
node = self._compile_node(node, manifest, extra_context)
node = self._compile_code(node, manifest, extra_context)

node, _ = self._recursively_prepend_ctes(node, manifest, extra_context)
if write:
Expand Down
18 changes: 0 additions & 18 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,24 +646,6 @@ def __post_deserialize__(cls, obj):
obj._lock = flags.MP_CONTEXT.Lock()
return obj

def sync_update_node(self, new_node: ManifestNode) -> ManifestNode:
"""update the node with a lock. The only time we should want to lock is
when compiling an ephemeral ancestor of a node at runtime, because
multiple threads could be just-in-time compiling the same ephemeral
dependency, and we want them to have a consistent view of the manifest.
If the existing node is not compiled, update it with the new node and
return that. If the existing node is compiled, do not update the
manifest and return the existing node.
"""
with self._lock:
existing = self.nodes[new_node.unique_id]
if getattr(existing, "compiled", False):
# already compiled
return existing
_update_into(self.nodes, new_node)
return new_node

def update_exposure(self, new_exposure: Exposure):
_update_into(self.exposures, new_exposure)

Expand Down
4 changes: 3 additions & 1 deletion core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,10 @@ def set_cte(self, cte_id: str, sql: str):
do if extra_ctes were an OrderedDict
"""
for cte in self.extra_ctes:
# Because it's possible that multiple threads are compiling the
# node at the same time, we don't want to overwrite already compiled
# sql in the extra_ctes with empty sql.
if cte.id == cte_id:
cte.sql = sql
break
else:
self.extra_ctes.append(InjectedCTE(id=cte_id, sql=sql))
Expand Down
47 changes: 8 additions & 39 deletions test/unit/test_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,30 +303,6 @@ def test__prepend_ctes__cte_not_compiled(self):
raw_code='select * from source_table',
checksum=FileHash.from_contents(''),
)
compiled_ephemeral = ModelNode(
name='ephemeral',
database='dbt',
schema='analytics',
alias='ephemeral',
resource_type=NodeType.Model,
unique_id='model.root.ephemeral',
fqn=['root', 'ephemeral'],
package_name='root',
refs=[],
sources=[],
depends_on=DependsOn(),
config=ephemeral_config,
tags=[],
path='ephemeral.sql',
original_file_path='ephemeral.sql',
language='sql',
raw_code='select * from source_table',
compiled=True,
compiled_code='select * from source_table',
extra_ctes_injected=True,
extra_ctes=[],
checksum=FileHash.from_contents(''),
)
manifest = Manifest(
macros={},
nodes={
Expand Down Expand Up @@ -367,24 +343,17 @@ def test__prepend_ctes__cte_not_compiled(self):
)

compiler = dbt.compilation.Compiler(self.config)
with patch.object(compiler, '_compile_node') as compile_node:
compile_node.return_value = compiled_ephemeral

result, _ = compiler._recursively_prepend_ctes(
manifest.nodes['model.root.view'],
manifest,
{}
)
compile_node.assert_called_once_with(
parsed_ephemeral, manifest, {})

self.assertEqual(result,
manifest.nodes.get('model.root.view'))
node = compiler.compile_node(
manifest.nodes['model.root.view'],
manifest,
{},
False
)

self.assertTrue(manifest.nodes['model.root.ephemeral'].compiled)
self.assertTrue(result.extra_ctes_injected)
self.assertTrue(node.extra_ctes_injected)
self.assertEqualIgnoreWhitespace(
result.compiled_code,
node.compiled_code,
('with __dbt__cte__ephemeral as ('
'select * from source_table'
') '
Expand Down
71 changes: 71 additions & 0 deletions tests/functional/materializations/test_ephemeral_compilation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import pytest
from dbt.tests.util import run_dbt

# Note: This tests compilation only, so is a dbt Core test and not an adapter test.
# There is some complicated logic in core/dbt/compilation.py having to do with
# ephemeral nodes and handling multiple threads at the same time. This test
# fails fairly regularly if that is broken, but does occasionally work (depending
# on the order in which things are compiled). It requires multi-threading to fail.


fct_eph_first_sql = """
-- fct_eph_first.sql
{{ config(materialized='ephemeral') }}
with int_eph_first as(
select * from {{ ref('int_eph_first') }}
)
select * from int_eph_first
"""

int_eph_first_sql = """
-- int_eph_first.sql
{{ config(materialized='ephemeral') }}
select
1 as first_column,
2 as second_column
"""

schema_yml = """
version: 2
models:
- name: int_eph_first
columns:
- name: first_column
tests:
- not_null
- name: second_column
tests:
- not_null
- name: fct_eph_first
columns:
- name: first_column
tests:
- not_null
- name: second_column
tests:
- not_null
"""


class TestEphemeralCompilation:
@pytest.fixture(scope="class")
def models(self):
return {
"int_eph_first.sql": int_eph_first_sql,
"fct_eph_first.sql": fct_eph_first_sql,
"schema.yml": schema_yml,
}

def test_ephemeral_compilation(self, project):
# Note: There are no models that run successfully. This testcase tests running tests.
results = run_dbt(["run"])
assert len(results) == 0

results = run_dbt(["test"])
len(results) == 4

0 comments on commit 0c06735

Please sign in to comment.