Skip to content

Commit

Permalink
Remove unnecessary IntermediateSnapshotNode and EmptySnapshotConfig (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank authored Jun 19, 2024
1 parent 4415731 commit d389ff1
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 154 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240618-140652.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Remove IntermediateSnapshotNode
time: 2024-06-18T14:06:52.618602-04:00
custom:
Author: gshank
Issue: "10326"
7 changes: 0 additions & 7 deletions core/dbt/contracts/graph/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ class UnitTestNodeConfig(NodeConfig):
expected_sql: Optional[str] = None


@dataclass
class EmptySnapshotConfig(NodeConfig):
materialized: str = "snapshot"
unique_key: Optional[str] = None # override NodeConfig unique_key definition


RESOURCE_TYPES: Dict[NodeType, Type[BaseConfig]] = {
NodeType.Metric: MetricConfig,
NodeType.SemanticModel: SemanticModelConfig,
Expand All @@ -62,7 +56,6 @@ class EmptySnapshotConfig(NodeConfig):
# base resource types are like resource types, except nothing has mandatory
# configs.
BASE_RESOURCE_TYPES: Dict[NodeType, Type[BaseConfig]] = RESOURCE_TYPES.copy()
BASE_RESOURCE_TYPES.update({NodeType.Snapshot: EmptySnapshotConfig})


def get_config_for(resource_type: NodeType, base=False) -> Type[BaseConfig]:
Expand Down
15 changes: 1 addition & 14 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
from dbt.artifacts.resources import SourceDefinition as SourceDefinitionResource
from dbt.artifacts.resources import SqlOperation as SqlOperationResource
from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource
from dbt.contracts.graph.model_config import EmptySnapshotConfig, UnitTestNodeConfig
from dbt.contracts.graph.model_config import UnitTestNodeConfig
from dbt.contracts.graph.node_args import ModelNodeArgs
from dbt.contracts.graph.unparsed import (
HasYamlMetadata,
Expand Down Expand Up @@ -1041,19 +1041,6 @@ class UnitTestFileFixture(BaseNode):
# ====================================


@dataclass
class IntermediateSnapshotNode(CompiledNode):
# at an intermediate stage in parsing, where we've built something better
# than an unparsed node for rendering in parse mode, it's pretty possible
# that we won't have critical snapshot-related information that is only
# defined in config blocks. To fix that, we have an intermediate type that
# uses a regular node config, which the snapshot parser will then convert
# into a full ParsedSnapshotNode after rendering. Note: it currently does
# not work to set snapshot config in schema files because of the validation.
resource_type: Literal[NodeType.Snapshot]
config: EmptySnapshotConfig = field(default_factory=EmptySnapshotConfig)


@dataclass
class SnapshotNode(SnapshotResource, CompiledNode):
@classmethod
Expand Down
35 changes: 18 additions & 17 deletions core/dbt/parser/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
FinalValue = TypeVar("FinalValue", bound=BaseNode)
IntermediateValue = TypeVar("IntermediateValue", bound=BaseNode)

IntermediateNode = TypeVar("IntermediateNode", bound=Any)
FinalNode = TypeVar("FinalNode", bound=ManifestNode)


Expand Down Expand Up @@ -118,7 +117,7 @@ def __call__(self, parsed_node: Any, override: Optional[str]) -> None:

class ConfiguredParser(
Parser[FinalNode],
Generic[ConfiguredBlockType, IntermediateNode, FinalNode],
Generic[ConfiguredBlockType, FinalNode],
):
def __init__(
self,
Expand All @@ -144,7 +143,7 @@ def get_compiled_path(cls, block: ConfiguredBlockType) -> str:
pass

@abc.abstractmethod
def parse_from_dict(self, dict, validate=True) -> IntermediateNode:
def parse_from_dict(self, dict, validate=True) -> FinalNode:
pass

@abc.abstractproperty
Expand Down Expand Up @@ -208,7 +207,7 @@ def _create_parsetime_node(
fqn: List[str],
name=None,
**kwargs,
) -> IntermediateNode:
) -> FinalNode:
"""Create the node that will be passed in to the parser context for
"rendering". Some information may be partial, as it'll be updated by
config() and any ref()/source() calls discovered during rendering.
Expand Down Expand Up @@ -253,10 +252,10 @@ def _create_parsetime_node(
)
raise DictParseError(exc, node=node)

def _context_for(self, parsed_node: IntermediateNode, config: ContextConfig) -> Dict[str, Any]:
def _context_for(self, parsed_node: FinalNode, config: ContextConfig) -> Dict[str, Any]:
return generate_parser_model_context(parsed_node, self.root_project, self.manifest, config)

def render_with_context(self, parsed_node: IntermediateNode, config: ContextConfig):
def render_with_context(self, parsed_node: FinalNode, config: ContextConfig):
# Given the parsed node and a ContextConfig to use during parsing,
# render the node's sql with macro capture enabled.
# Note: this mutates the config object when config calls are rendered.
Expand All @@ -271,7 +270,7 @@ def render_with_context(self, parsed_node: IntermediateNode, config: ContextConf
# updating the config with new config passed in, then re-creating the
# config from the dict in the node.
def update_parsed_node_config_dict(
self, parsed_node: IntermediateNode, config_dict: Dict[str, Any]
self, parsed_node: FinalNode, config_dict: Dict[str, Any]
) -> None:
# Overwrite node config
final_config_dict = parsed_node.config.to_dict(omit_none=True)
Expand All @@ -281,7 +280,7 @@ def update_parsed_node_config_dict(
parsed_node.config = parsed_node.config.from_dict(final_config_dict)

def update_parsed_node_relation_names(
self, parsed_node: IntermediateNode, config_dict: Dict[str, Any]
self, parsed_node: FinalNode, config_dict: Dict[str, Any]
) -> None:

# These call the RelationUpdate callable to go through generate_name macros
Expand All @@ -300,7 +299,7 @@ def update_parsed_node_relation_names(

def update_parsed_node_config(
self,
parsed_node: IntermediateNode,
parsed_node: FinalNode,
config: ContextConfig,
context=None,
patch_config_dict=None,
Expand Down Expand Up @@ -334,6 +333,7 @@ def update_parsed_node_config(
# If we have access in the config, copy to node level
if parsed_node.resource_type == NodeType.Model and config_dict.get("access", None):
if AccessType.is_valid(config_dict["access"]):
assert hasattr(parsed_node, "access")
parsed_node.access = AccessType(config_dict["access"])
else:
raise InvalidAccessTypeError(
Expand All @@ -360,7 +360,9 @@ def update_parsed_node_config(
if "contract" in config_dict and config_dict["contract"]:
contract_dct = config_dict["contract"]
Contract.validate(contract_dct)
parsed_node.contract = Contract.from_dict(contract_dct)
# Seed node has contract config (from NodeConfig) but no contract in SeedNode
if hasattr(parsed_node, "contract"):
parsed_node.contract = Contract.from_dict(contract_dct)

# unrendered_config is used to compare the original database/schema/alias
# values and to handle 'same_config' and 'same_contents' calls
Expand All @@ -382,6 +384,7 @@ def update_parsed_node_config(

# at this point, we've collected our hooks. Use the node context to
# render each hook and collect refs/sources
assert hasattr(parsed_node.config, "pre_hook") and hasattr(parsed_node.config, "post_hook")
hooks = list(itertools.chain(parsed_node.config.pre_hook, parsed_node.config.post_hook))
# skip context rebuilding if there aren't any hooks
if not hooks:
Expand Down Expand Up @@ -413,7 +416,7 @@ def config_dict(
self._mangle_hooks(config_dict)
return config_dict

def render_update(self, node: IntermediateNode, config: ContextConfig) -> None:
def render_update(self, node: FinalNode, config: ContextConfig) -> None:
try:
context = self.render_with_context(node, config)
self.update_parsed_node_config(node, config, context=context)
Expand Down Expand Up @@ -462,25 +465,23 @@ def parse_file(self, file_block: FileBlock) -> None:
pass

@abc.abstractmethod
def transform(self, node: IntermediateNode) -> FinalNode:
def transform(self, node: FinalNode) -> FinalNode:
pass


class SimpleParser(
ConfiguredParser[ConfiguredBlockType, FinalNode, FinalNode],
ConfiguredParser[ConfiguredBlockType, FinalNode],
Generic[ConfiguredBlockType, FinalNode],
):
def transform(self, node):
return node


class SQLParser(
ConfiguredParser[FileBlock, IntermediateNode, FinalNode], Generic[IntermediateNode, FinalNode]
):
class SQLParser(ConfiguredParser[FileBlock, FinalNode], Generic[FinalNode]):
def parse_file(self, file_block: FileBlock) -> None:
self.parse_node(file_block)


class SimpleSQLParser(SQLParser[FinalNode, FinalNode]):
class SimpleSQLParser(SQLParser[FinalNode]):
def transform(self, node):
return node
24 changes: 8 additions & 16 deletions core/dbt/parser/snapshots.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from typing import List

from dbt.contracts.graph.nodes import IntermediateSnapshotNode, SnapshotNode
from dbt.contracts.graph.nodes import SnapshotNode
from dbt.exceptions import SnapshopConfigError
from dbt.node_types import NodeType
from dbt.parser.base import SQLParser
Expand All @@ -10,11 +10,11 @@
from dbt_common.dataclass_schema import ValidationError


class SnapshotParser(SQLParser[IntermediateSnapshotNode, SnapshotNode]):
def parse_from_dict(self, dct, validate=True) -> IntermediateSnapshotNode:
class SnapshotParser(SQLParser[SnapshotNode]):
def parse_from_dict(self, dct, validate=True) -> SnapshotNode:
if validate:
IntermediateSnapshotNode.validate(dct)
return IntermediateSnapshotNode.from_dict(dct)
SnapshotNode.validate(dct)
return SnapshotNode.from_dict(dct)

@property
def resource_type(self) -> NodeType:
Expand Down Expand Up @@ -54,18 +54,10 @@ def get_fqn(self, path: str, name: str) -> List[str]:
fqn.append(name)
return fqn

def transform(self, node: IntermediateSnapshotNode) -> SnapshotNode:
def transform(self, node: SnapshotNode) -> SnapshotNode:
try:
# The config_call_dict is not serialized, because normally
# it is not needed after parsing. But since the snapshot node
# does this extra to_dict, save and restore it, to keep
# the model config when there is also schema config.
config_call_dict = node.config_call_dict
dct = node.to_dict(omit_none=True)
parsed_node = SnapshotNode.from_dict(dct)
parsed_node.config_call_dict = config_call_dict
self.set_snapshot_attributes(parsed_node)
return parsed_node
self.set_snapshot_attributes(node)
return node
except ValidationError as exc:
raise SnapshopConfigError(exc, node)

Expand Down
100 changes: 0 additions & 100 deletions tests/unit/contracts/graph/test_nodes_parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from dbt.artifacts.resources.types import TimePeriod
from dbt.contracts.files import FileHash
from dbt.contracts.graph.model_config import (
EmptySnapshotConfig,
ModelConfig,
NodeConfig,
SeedConfig,
Expand All @@ -44,7 +43,6 @@
Exposure,
GenericTestNode,
HookNode,
IntermediateSnapshotNode,
Macro,
Metric,
ModelNode,
Expand Down Expand Up @@ -1587,51 +1585,6 @@ def basic_timestamp_snapshot_object():
)


@pytest.fixture
def basic_intermediate_timestamp_snapshot_object():
cfg = EmptySnapshotConfig()
cfg._extra.update(
{
"strategy": "timestamp",
"unique_key": "id",
"updated_at": "last_update",
"target_database": "some_snapshot_db",
"target_schema": "some_snapshot_schema",
}
)

return IntermediateSnapshotNode(
package_name="test",
path="/root/x/path.sql",
original_file_path="/root/path.sql",
language="sql",
raw_code="select * from wherever",
name="foo",
resource_type=NodeType.Snapshot,
unique_id="model.test.foo",
fqn=["test", "models", "foo"],
refs=[],
sources=[],
metrics=[],
depends_on=DependsOn(),
description="",
database="test_db",
schema="test_schema",
alias="bar",
tags=[],
config=cfg,
checksum=FileHash.from_contents(""),
created_at=1,
unrendered_config={
"strategy": "timestamp",
"unique_key": "id",
"updated_at": "last_update",
"target_database": "some_snapshot_db",
"target_schema": "some_snapshot_schema",
},
)


@pytest.fixture
def basic_check_snapshot_dict():
return {
Expand Down Expand Up @@ -1734,64 +1687,14 @@ def basic_check_snapshot_object():
)


@pytest.fixture
def basic_intermediate_check_snapshot_object():
cfg = EmptySnapshotConfig()
cfg._extra.update(
{
"unique_key": "id",
"strategy": "check",
"check_cols": "all",
"target_database": "some_snapshot_db",
"target_schema": "some_snapshot_schema",
}
)

return IntermediateSnapshotNode(
package_name="test",
path="/root/x/path.sql",
original_file_path="/root/path.sql",
language="sql",
raw_code="select * from wherever",
name="foo",
resource_type=NodeType.Snapshot,
unique_id="model.test.foo",
fqn=["test", "models", "foo"],
refs=[],
sources=[],
metrics=[],
depends_on=DependsOn(),
description="",
database="test_db",
schema="test_schema",
alias="bar",
tags=[],
config=cfg,
checksum=FileHash.from_contents(""),
created_at=1.0,
unrendered_config={
"target_database": "some_snapshot_db",
"target_schema": "some_snapshot_schema",
"unique_key": "id",
"strategy": "check",
"check_cols": "all",
},
)


def test_timestamp_snapshot_ok(
basic_timestamp_snapshot_dict,
basic_timestamp_snapshot_object,
basic_intermediate_timestamp_snapshot_object,
):
node_dict = basic_timestamp_snapshot_dict
node = basic_timestamp_snapshot_object
inter = basic_intermediate_timestamp_snapshot_object

assert_symmetric(node, node_dict, SnapshotNode)
# node_from_dict = SnapshotNode.from_dict(inter.to_dict(omit_none=True))
# node_from_dict.created_at = 1
assert SnapshotNode.from_dict(inter.to_dict(omit_none=True)) == node
assert node.is_refable is True
assert node.is_ephemeral is False
pickle.loads(pickle.dumps(node))
Expand All @@ -1800,14 +1703,11 @@ def test_timestamp_snapshot_ok(
def test_check_snapshot_ok(
basic_check_snapshot_dict,
basic_check_snapshot_object,
basic_intermediate_check_snapshot_object,
):
node_dict = basic_check_snapshot_dict
node = basic_check_snapshot_object
inter = basic_intermediate_check_snapshot_object

assert_symmetric(node, node_dict, SnapshotNode)
assert SnapshotNode.from_dict(inter.to_dict(omit_none=True)) == node
assert node.is_refable is True
assert node.is_ephemeral is False
pickle.loads(pickle.dumps(node))
Expand Down

0 comments on commit d389ff1

Please sign in to comment.