Skip to content

Commit

Permalink
add model freshness for adaptive job
Browse files Browse the repository at this point in the history
  • Loading branch information
ChenyuLInx committed Dec 19, 2024
1 parent bf18b59 commit b6b56a9
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 3 deletions.
7 changes: 6 additions & 1 deletion core/dbt/artifacts/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@
MetricTimeWindow,
MetricTypeParams,
)
from dbt.artifacts.resources.v1.model import Model, ModelConfig, TimeSpine
from dbt.artifacts.resources.v1.model import (
Model,
ModelConfig,
ModelFreshness,
TimeSpine,
)
from dbt.artifacts.resources.v1.owner import Owner
from dbt.artifacts.resources.v1.saved_query import (
Export,
Expand Down
16 changes: 15 additions & 1 deletion core/dbt/artifacts/resources/v1/model.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import enum
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Literal, Optional

from dbt.artifacts.resources.types import AccessType, NodeType
from dbt.artifacts.resources.types import AccessType, NodeType, TimePeriod
from dbt.artifacts.resources.v1.components import (
CompiledResource,
DeferRelation,
NodeVersion,
Time,
)
from dbt.artifacts.resources.v1.config import NodeConfig
from dbt_common.contracts.config.base import MergeBehavior
Expand Down Expand Up @@ -34,6 +36,17 @@ class TimeSpine(dbtClassMixin):
custom_granularities: List[CustomGranularity] = field(default_factory=list)


class ModelFreshnessDependsOnOptions(enum.StrEnum):
all = "all"
any = "any"


@dataclass
class ModelFreshness(dbtClassMixin):
depends_on: ModelFreshnessDependsOnOptions = ModelFreshnessDependsOnOptions.any
build_after: Time = field(default_factory=lambda: Time(period=TimePeriod.hour, count=0))


@dataclass
class Model(CompiledResource):
resource_type: Literal[NodeType.Model]
Expand All @@ -46,6 +59,7 @@ class Model(CompiledResource):
defer_relation: Optional[DeferRelation] = None
primary_key: List[str] = field(default_factory=list)
time_spine: Optional[TimeSpine] = None
freshness: Optional[ModelFreshness] = None

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from dbt.artifacts.resources import SqlOperation as SqlOperationResource
from dbt.artifacts.resources import TimeSpine
from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource
from dbt.artifacts.resources.v1.model import ModelFreshness
from dbt.artifacts.schemas.batch_results import BatchResults
from dbt.clients.jinja_static import statically_extract_has_name_this
from dbt.contracts.graph.model_config import UnitTestNodeConfig
Expand Down Expand Up @@ -1698,6 +1699,7 @@ class ParsedNodePatch(ParsedPatch):
constraints: List[Dict[str, Any]]
deprecation_date: Optional[datetime]
time_spine: Optional[TimeSpine] = None
freshness: Optional[ModelFreshness] = None


@dataclass
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
MacroArgument,
MaturityType,
MeasureAggregationParameters,
ModelFreshness,
NodeVersion,
Owner,
Quoting,
Expand Down Expand Up @@ -221,6 +222,7 @@ class UnparsedModelUpdate(UnparsedNodeUpdate):
versions: Sequence[UnparsedVersion] = field(default_factory=list)
deprecation_date: Optional[datetime.datetime] = None
time_spine: Optional[TimeSpine] = None
freshness: Optional[ModelFreshness] = None

def __post_init__(self) -> None:
if self.latest_version:
Expand Down
21 changes: 20 additions & 1 deletion core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
from typing import Any, Callable, Dict, Generic, Iterable, List, Optional, Type, TypeVar

from dbt.artifacts.resources import RefArgs
from dbt.artifacts.resources.v1.model import CustomGranularity, TimeSpine
from dbt.artifacts.resources.v1.components import Time
from dbt.artifacts.resources.v1.model import (
CustomGranularity,
ModelFreshness,
TimeSpine,
)
from dbt.clients.jinja_static import statically_parse_ref_or_source
from dbt.clients.yaml_helper import load_yaml_text
from dbt.config import RuntimeConfig
Expand Down Expand Up @@ -722,6 +727,7 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None:
# code consistency.
deprecation_date: Optional[datetime.datetime] = None
time_spine: Optional[TimeSpine] = None
freshness: Optional[ModelFreshness] = None
if isinstance(block.target, UnparsedModelUpdate):
deprecation_date = block.target.deprecation_date
time_spine = (
Expand All @@ -738,6 +744,17 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None:
if block.target.time_spine
else None
)
freshness = (
ModelFreshness(
depends_on=block.target.freshness.depends_on,
build_after=Time(
count=block.target.freshness.build_after.count,
period=block.target.freshness.build_after.period,
),
)
if block.target.freshness
else None
)
patch = ParsedNodePatch(
name=block.target.name,
original_file_path=block.target.original_file_path,
Expand All @@ -754,6 +771,7 @@ def parse_patch(self, block: TargetBlock[NodeTarget], refs: ParserRef) -> None:
constraints=block.target.constraints,
deprecation_date=deprecation_date,
time_spine=time_spine,
freshness=freshness,
)
assert isinstance(self.yaml.file, SchemaSourceFile)
source_file: SchemaSourceFile = self.yaml.file
Expand Down Expand Up @@ -1043,6 +1061,7 @@ def patch_node_properties(self, node, patch: "ParsedNodePatch") -> None:
# These two will have to be reapplied after config is built for versioned models
self.patch_constraints(node, patch.constraints)
self.patch_time_spine(node, patch.time_spine)
node.freshness = patch.freshness
node.build_contract_checksum()

def patch_constraints(self, node: ModelNode, constraints: List[Dict[str, Any]]) -> None:
Expand Down
110 changes: 110 additions & 0 deletions schemas/dbt/manifest/v12.json
Original file line number Diff line number Diff line change
Expand Up @@ -4777,6 +4777,61 @@
}
],
"default": null
},
"freshness": {
"anyOf": [
{
"type": "object",
"title": "ModelFreshness",
"properties": {
"depends_on": {
"enum": [
"all",
"any"
],
"default": "any"
},
"build_after": {
"type": "object",
"title": "Time",
"properties": {
"count": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null
},
"period": {
"anyOf": [
{
"enum": [
"minute",
"hour",
"day"
]
},
{
"type": "null"
}
],
"default": null
}
},
"additionalProperties": false
}
},
"additionalProperties": false
},
{
"type": "null"
}
],
"default": null
}
},
"additionalProperties": false,
Expand Down Expand Up @@ -14574,6 +14629,61 @@
}
],
"default": null
},
"freshness": {
"anyOf": [
{
"type": "object",
"title": "ModelFreshness",
"properties": {
"depends_on": {
"enum": [
"all",
"any"
],
"default": "any"
},
"build_after": {
"type": "object",
"title": "Time",
"properties": {
"count": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null
},
"period": {
"anyOf": [
{
"enum": [
"minute",
"hour",
"day"
]
},
{
"type": "null"
}
],
"default": null
}
},
"additionalProperties": false
}
},
"additionalProperties": false
},
{
"type": "null"
}
],
"default": null
}
},
"additionalProperties": false,
Expand Down
39 changes: 39 additions & 0 deletions tests/unit/parser/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from dbt import tracking
from dbt.artifacts.resources import ModelConfig, RefArgs
from dbt.artifacts.resources.v1.components import Time
from dbt.context.context_config import ContextConfig
from dbt.contracts.files import FileHash, FilePath, SchemaSourceFile, SourceFile
from dbt.contracts.graph.manifest import Manifest
Expand Down Expand Up @@ -301,6 +302,22 @@ def assertEqualNodes(node_one, node_two):
arg: 100
"""

SINGLE_TALBE_MODEL_FRESHNESS = """
models:
- name: my_model
description: A description of my model
freshness:
build_after: {count: 1, period: day}
"""

SINGLE_TALBE_MODEL_FRESHNESS_ONLY_DEPEND_ON = """
models:
- name: my_model
description: A description of my model
freshness:
depends_on: all
"""


MULTIPLE_TABLE_VERSIONED_MODEL_TESTS = """
models:
Expand Down Expand Up @@ -607,6 +624,28 @@ def test__read_basic_model_tests(self):
self.assertEqual(len(list(self.parser.manifest.sources)), 0)
self.assertEqual(len(list(self.parser.manifest.nodes)), 4)

def test__parse_model_freshness(self):
block = self.file_block_for(SINGLE_TALBE_MODEL_FRESHNESS, "test_one.yml")
self.parser.manifest.files[block.file.file_id] = block.file
dct = yaml_from_file(block.file)
self.parser.parse_file(block, dct)
self.assert_has_manifest_lengths(self.parser.manifest, nodes=1)
assert self.parser.manifest.nodes["model.root.my_model"].freshness.depends_on == "any"
assert self.parser.manifest.nodes["model.root.my_model"].freshness.build_after == Time(
count=1, period="day"
)

def test__parse_model_freshness_depend_on(self):
block = self.file_block_for(SINGLE_TALBE_MODEL_FRESHNESS_ONLY_DEPEND_ON, "test_one.yml")
self.parser.manifest.files[block.file.file_id] = block.file
dct = yaml_from_file(block.file)
self.parser.parse_file(block, dct)
self.assert_has_manifest_lengths(self.parser.manifest, nodes=1)
assert self.parser.manifest.nodes["model.root.my_model"].freshness.depends_on == "all"
assert self.parser.manifest.nodes["model.root.my_model"].freshness.build_after == Time(
count=0, period="hour"
)

def test__read_basic_model_tests_wrong_severity(self):
block = self.yaml_block_for(SINGLE_TABLE_MODEL_TESTS_WRONG_SEVERITY, "test_one.yml")
dct = yaml_from_file(block.file)
Expand Down

0 comments on commit b6b56a9

Please sign in to comment.