diff --git a/.changes/unreleased/Fixes-20230221-170630.yaml b/.changes/unreleased/Fixes-20230221-170630.yaml new file mode 100644 index 00000000000..f4c08371cd1 --- /dev/null +++ b/.changes/unreleased/Fixes-20230221-170630.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Fix compilation logic for ephemeral nodes +time: 2023-02-21T17:06:30.218568-05:00 +custom: + Author: gshank + Issue: "6885" diff --git a/core/dbt/compilation.py b/core/dbt/compilation.py index 37761207d48..4f9119d4db5 100644 --- a/core/dbt/compilation.py +++ b/core/dbt/compilation.py @@ -90,10 +90,11 @@ def _generate_stats(manifest: Manifest): def _add_prepended_cte(prepended_ctes, new_cte): for cte in prepended_ctes: - if cte.id == new_cte.id: + if cte.id == new_cte.id and new_cte.sql: cte.sql = new_cte.sql return - prepended_ctes.append(new_cte) + if new_cte.sql: + prepended_ctes.append(new_cte) def _extend_prepended_ctes(prepended_ctes, new_prepended_ctes): @@ -257,16 +258,18 @@ def _recursively_prepend_ctes( inserting CTEs into the SQL. """ if model.compiled_code is None: - raise DbtRuntimeError("Cannot inject ctes into an unparsed node", model) + raise DbtRuntimeError("Cannot inject ctes into an uncompiled node", model) + + # extra_ctes_injected flag says that we've already recursively injected the ctes if model.extra_ctes_injected: return (model, model.extra_ctes) # Just to make it plain that nothing is actually injected for this case - if not model.extra_ctes: + if len(model.extra_ctes) == 0: + # SeedNodes don't have compilation attributes if not isinstance(model, SeedNode): model.extra_ctes_injected = True - manifest.update_node(model) - return (model, model.extra_ctes) + return (model, []) # This stores the ctes which will all be recursively # gathered and then "injected" into the model. @@ -275,7 +278,8 @@ def _recursively_prepend_ctes( # extra_ctes are added to the model by # RuntimeRefResolver.create_relation, which adds an # extra_cte for every model relation which is an - # ephemeral model. + # ephemeral model. InjectedCTEs have a unique_id and sql. + # extra_ctes start out with sql set to None, and the sql is set in this loop. for cte in model.extra_ctes: if cte.id not in manifest.nodes: raise DbtInternalError( @@ -288,23 +292,23 @@ def _recursively_prepend_ctes( if not cte_model.is_ephemeral_model: raise DbtInternalError(f"{cte.id} is not ephemeral") - # This model has already been compiled, so it's been - # through here before - if getattr(cte_model, "compiled", False): + # This model has already been compiled and extra_ctes_injected, so it's been + # through here before. We already checked above for extra_ctes_injected, but + # checking again because updates maybe have happened in another thread. + if cte_model.compiled is True and cte_model.extra_ctes_injected is True: new_prepended_ctes = cte_model.extra_ctes # if the cte_model isn't compiled, i.e. first time here else: # This is an ephemeral parsed model that we can compile. - # Compile and update the node - cte_model = self._compile_node(cte_model, manifest, extra_context) - # recursively call this method + # Render the raw_code and set compiled to True + cte_model = self._compile_code(cte_model, manifest, extra_context) + # recursively call this method, sets extra_ctes_injected to True cte_model, new_prepended_ctes = self._recursively_prepend_ctes( cte_model, manifest, extra_context ) - # Save compiled SQL file and sync manifest + # Write compiled SQL file self._write_node(cte_model) - manifest.sync_update_node(cte_model) _extend_prepended_ctes(prepended_ctes, new_prepended_ctes) @@ -318,20 +322,21 @@ def _recursively_prepend_ctes( model.compiled_code, prepended_ctes, ) - model._pre_injected_sql = model.compiled_code - model.compiled_code = injected_sql - model.extra_ctes_injected = True - model.extra_ctes = prepended_ctes - model.validate(model.to_dict(omit_none=True)) - manifest.update_node(model) + # Check again before updating for multi-threading + if not model.extra_ctes_injected: + model._pre_injected_sql = model.compiled_code + model.compiled_code = injected_sql + model.extra_ctes = prepended_ctes + model.extra_ctes_injected = True - return model, prepended_ctes + # if model.extra_ctes is not set to prepended ctes, something went wrong + return model, model.extra_ctes - # Sets compiled fields in the ManifestSQLNode passed in, + # Sets compiled_code and compiled flag in the ManifestSQLNode passed in, # creates a "context" dictionary for jinja rendering, # and then renders the "compiled_code" using the node, the # raw_code and the context. - def _compile_node( + def _compile_code( self, node: ManifestSQLNode, manifest: Manifest, @@ -340,16 +345,6 @@ def _compile_node( if extra_context is None: extra_context = {} - data = node.to_dict(omit_none=True) - data.update( - { - "compiled": False, - "compiled_code": None, - "extra_ctes_injected": False, - "extra_ctes": [], - } - ) - if node.language == ModelLanguage.python: # TODO could we also 'minify' this code at all? just aesthetic, not functional @@ -378,6 +373,8 @@ def _compile_node( node, ) + node.compiled = True + # relation_name is set at parse time, except for tests without store_failures, # but cli param can turn on store_failures, so we set here. if ( @@ -390,8 +387,6 @@ def _compile_node( relation_name = str(relation_cls.create_from(self.config, node)) node.relation_name = relation_name - node.compiled = True - return node def write_graph_file(self, linker: Linker, manifest: Manifest): @@ -522,11 +517,11 @@ def compile_node( ) -> ManifestSQLNode: """This is the main entry point into this code. It's called by CompileRunner.compile, GenericRPCRunner.compile, and - RunTask.get_hook_sql. It calls '_compile_node' to convert - the node into a compiled node, and then calls the + RunTask.get_hook_sql. It calls '_compile_code' to render + the node's raw_code into compiled_code, and then calls the recursive method to "prepend" the ctes. """ - node = self._compile_node(node, manifest, extra_context) + node = self._compile_code(node, manifest, extra_context) node, _ = self._recursively_prepend_ctes(node, manifest, extra_context) if write: diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index ac764e2bf32..6ad5c3e5583 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -646,24 +646,6 @@ def __post_deserialize__(cls, obj): obj._lock = flags.MP_CONTEXT.Lock() return obj - def sync_update_node(self, new_node: ManifestNode) -> ManifestNode: - """update the node with a lock. The only time we should want to lock is - when compiling an ephemeral ancestor of a node at runtime, because - multiple threads could be just-in-time compiling the same ephemeral - dependency, and we want them to have a consistent view of the manifest. - - If the existing node is not compiled, update it with the new node and - return that. If the existing node is compiled, do not update the - manifest and return the existing node. - """ - with self._lock: - existing = self.nodes[new_node.unique_id] - if getattr(existing, "compiled", False): - # already compiled - return existing - _update_into(self.nodes, new_node) - return new_node - def update_exposure(self, new_exposure: Exposure): _update_into(self.exposures, new_exposure) diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index ecc7027666c..516b7d9ba3f 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -410,8 +410,10 @@ def set_cte(self, cte_id: str, sql: str): do if extra_ctes were an OrderedDict """ for cte in self.extra_ctes: + # Because it's possible that multiple threads are compiling the + # node at the same time, we don't want to overwrite already compiled + # sql in the extra_ctes with empty sql. if cte.id == cte_id: - cte.sql = sql break else: self.extra_ctes.append(InjectedCTE(id=cte_id, sql=sql)) diff --git a/test/unit/test_compiler.py b/test/unit/test_compiler.py index 649a5918f91..28dd1ed32dc 100644 --- a/test/unit/test_compiler.py +++ b/test/unit/test_compiler.py @@ -303,30 +303,6 @@ def test__prepend_ctes__cte_not_compiled(self): raw_code='select * from source_table', checksum=FileHash.from_contents(''), ) - compiled_ephemeral = ModelNode( - name='ephemeral', - database='dbt', - schema='analytics', - alias='ephemeral', - resource_type=NodeType.Model, - unique_id='model.root.ephemeral', - fqn=['root', 'ephemeral'], - package_name='root', - refs=[], - sources=[], - depends_on=DependsOn(), - config=ephemeral_config, - tags=[], - path='ephemeral.sql', - original_file_path='ephemeral.sql', - language='sql', - raw_code='select * from source_table', - compiled=True, - compiled_code='select * from source_table', - extra_ctes_injected=True, - extra_ctes=[], - checksum=FileHash.from_contents(''), - ) manifest = Manifest( macros={}, nodes={ @@ -367,24 +343,17 @@ def test__prepend_ctes__cte_not_compiled(self): ) compiler = dbt.compilation.Compiler(self.config) - with patch.object(compiler, '_compile_node') as compile_node: - compile_node.return_value = compiled_ephemeral - - result, _ = compiler._recursively_prepend_ctes( - manifest.nodes['model.root.view'], - manifest, - {} - ) - compile_node.assert_called_once_with( - parsed_ephemeral, manifest, {}) - - self.assertEqual(result, - manifest.nodes.get('model.root.view')) + node = compiler.compile_node( + manifest.nodes['model.root.view'], + manifest, + {}, + False + ) self.assertTrue(manifest.nodes['model.root.ephemeral'].compiled) - self.assertTrue(result.extra_ctes_injected) + self.assertTrue(node.extra_ctes_injected) self.assertEqualIgnoreWhitespace( - result.compiled_code, + node.compiled_code, ('with __dbt__cte__ephemeral as (' 'select * from source_table' ') ' diff --git a/tests/functional/materializations/test_ephemeral_compilation.py b/tests/functional/materializations/test_ephemeral_compilation.py new file mode 100644 index 00000000000..56f49928756 --- /dev/null +++ b/tests/functional/materializations/test_ephemeral_compilation.py @@ -0,0 +1,71 @@ +import pytest +from dbt.tests.util import run_dbt + +# Note: This tests compilation only, so is a dbt Core test and not an adapter test. +# There is some complicated logic in core/dbt/compilation.py having to do with +# ephemeral nodes and handling multiple threads at the same time. This test +# fails fairly regularly if that is broken, but does occasionally work (depending +# on the order in which things are compiled). It requires multi-threading to fail. + + +fct_eph_first_sql = """ +-- fct_eph_first.sql +{{ config(materialized='ephemeral') }} + +with int_eph_first as( + select * from {{ ref('int_eph_first') }} +) + +select * from int_eph_first +""" + +int_eph_first_sql = """ +-- int_eph_first.sql +{{ config(materialized='ephemeral') }} + +select + 1 as first_column, + 2 as second_column +""" + +schema_yml = """ +version: 2 + +models: + - name: int_eph_first + columns: + - name: first_column + tests: + - not_null + - name: second_column + tests: + - not_null + + - name: fct_eph_first + columns: + - name: first_column + tests: + - not_null + - name: second_column + tests: + - not_null + +""" + + +class TestEphemeralCompilation: + @pytest.fixture(scope="class") + def models(self): + return { + "int_eph_first.sql": int_eph_first_sql, + "fct_eph_first.sql": fct_eph_first_sql, + "schema.yml": schema_yml, + } + + def test_ephemeral_compilation(self, project): + # Note: There are no models that run successfully. This testcase tests running tests. + results = run_dbt(["run"]) + assert len(results) == 0 + + results = run_dbt(["test"]) + len(results) == 4