Skip to content

Commit

Permalink
Custom SQL for get source maxLoadedAt
Browse files Browse the repository at this point in the history
  • Loading branch information
ChenyuLInx committed Dec 18, 2024
1 parent 7df04b0 commit 8780b8c
Show file tree
Hide file tree
Showing 14 changed files with 366 additions and 36 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241217-171631.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Calculate source freshness via a SQL query
time: 2024-12-17T17:16:31.841076-08:00
custom:
Author: ChenyuLInx
Issue: "8797"
1 change: 1 addition & 0 deletions core/dbt/artifacts/resources/v1/source_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class ParsedSourceMandatory(GraphResource, HasRelationMetadata):
class SourceDefinition(ParsedSourceMandatory):
quoting: Quoting = field(default_factory=Quoting)
loaded_at_field: Optional[str] = None
loaded_at_query: Optional[str] = None
freshness: Optional[FreshnessThreshold] = None
external: Optional[ExternalTable] = None
description: str = ""
Expand Down
18 changes: 16 additions & 2 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ class OperationProvider(RuntimeProvider):

# Base context collection, used for parsing configs.
class ProviderContext(ManifestContext):
# subclasses are MacroContext, ModelContext, TestContext
# subclasses are MacroContext, ModelContext, TestContext, SourceContext
def __init__(
self,
model,
Expand All @@ -893,7 +893,7 @@ def __init__(
raise DbtInternalError(f"Invalid provider given to context: {provider}")
# mypy appeasement - we know it'll be a RuntimeConfig
self.config: RuntimeConfig
self.model: Union[Macro, ManifestNode] = model
self.model: Union[Macro, ManifestNode, SourceDefinition] = model
super().__init__(config, manifest, model.package_name)
self.sql_results: Dict[str, Optional[AttrDict]] = {}
self.context_config: Optional[ContextConfig] = context_config
Expand Down Expand Up @@ -1558,6 +1558,20 @@ def __init__(
self._search_package = search_package


class SourceContext(ProviderContext):
# SourceContext is being used to render jinja SQL during execution of
# custom SQL in source freshness. It is not used for parsing.
model: SourceDefinition

@contextproperty()
def this(self) -> Optional[RelationProxy]:
return self.db_wrapper.Relation.create_from(self.config, self.model)

Check warning on line 1568 in core/dbt/context/providers.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/context/providers.py#L1568

Added line #L1568 was not covered by tests

@contextproperty()
def source_node(self) -> SourceDefinition:
return self.model

Check warning on line 1572 in core/dbt/context/providers.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/context/providers.py#L1572

Added line #L1572 was not covered by tests


class ModelContext(ProviderContext):
model: ManifestNode

Expand Down
4 changes: 4 additions & 0 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ class UnparsedSourceTableDefinition(HasColumnTests, HasColumnAndTestProps):
config: Dict[str, Any] = field(default_factory=dict)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
identifier: Optional[str] = None
quoting: Quoting = field(default_factory=Quoting)
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
Expand All @@ -342,6 +343,7 @@ class UnparsedSourceDefinition(dbtClassMixin):
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
tables: List[UnparsedSourceTableDefinition] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
config: Dict[str, Any] = field(default_factory=dict)
Expand Down Expand Up @@ -379,6 +381,7 @@ class SourceTablePatch(dbtClassMixin):
docs: Optional[Docs] = None
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
identifier: Optional[str] = None
quoting: Quoting = field(default_factory=Quoting)
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
Expand Down Expand Up @@ -422,6 +425,7 @@ class SourcePatch(dbtClassMixin):
freshness: Optional[Optional[FreshnessThreshold]] = field(default_factory=FreshnessThreshold)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
tables: Optional[List[SourceTablePatch]] = None
tags: Optional[List[str]] = None

Expand Down
17 changes: 13 additions & 4 deletions core/dbt/parser/schema_renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,21 @@ def _is_norender_key(self, keypath: Keypath) -> bool:
"tests" and "data_tests" are both currently supported but "tests" has been deprecated
"""
# top level descriptions and data_tests
if len(keypath) >= 1 and keypath[0] in ("tests", "data_tests", "description"):
if len(keypath) >= 1 and keypath[0] in (
"tests",
"data_tests",
"description",
"loaded_at_query",
):
return True

# columns descriptions and data_tests
if len(keypath) == 2 and keypath[1] in ("tests", "data_tests", "description"):
if len(keypath) == 2 and keypath[1] in (
"tests",
"data_tests",
"description",
"loaded_at_query",
):
return True

# pre- and post-hooks
Expand Down Expand Up @@ -69,9 +79,8 @@ def _is_norender_key(self, keypath: Keypath) -> bool:
def should_render_keypath(self, keypath: Keypath) -> bool:
if len(keypath) < 1:
return True

if self.key == "sources":
if keypath[0] == "description":
if keypath[0] in ("description", "loaded_at_query"):
return False
if keypath[0] == "tables":
if self._is_norender_key(keypath[2:]):
Expand Down
19 changes: 19 additions & 0 deletions core/dbt/parser/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
UnparsedSourceTableDefinition,
)
from dbt.events.types import FreshnessConfigProblem, UnusedTables
from dbt.exceptions import ParsingError
from dbt.node_types import NodeType
from dbt.parser.common import ParserRef
from dbt.parser.schema_generic_tests import SchemaGenericTestParser
Expand Down Expand Up @@ -131,11 +132,28 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition:
# We need to be able to tell the difference between explicitly setting the loaded_at_field to None/null
# and when it's simply not set. This allows a user to override the source level loaded_at_field so that
# specific table can default to metadata-based freshness.
if table.loaded_at_field_present and table.loaded_at_query:
raise ParsingError(
"Cannot specify both loaded_at_field and loaded_at_query at table level."
)
if source.loaded_at_field and source.loaded_at_query:
raise ParsingError(
"Cannot specify both loaded_at_field and loaded_at_query at source level."
)

if table.loaded_at_field_present or table.loaded_at_field is not None:
loaded_at_field = table.loaded_at_field
else:
loaded_at_field = source.loaded_at_field # may be None, that's okay

loaded_at_query: Optional[str]
if table.loaded_at_query is not None:
loaded_at_query = table.loaded_at_query
else:
if table.loaded_at_field_present:
loaded_at_query = None
else:
loaded_at_query = source.loaded_at_query
freshness = merge_freshness(source.freshness, table.freshness)
quoting = source.quoting.merged(table.quoting)
# path = block.path.original_file_path
Expand Down Expand Up @@ -185,6 +203,7 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition:
meta=meta,
loader=source.loader,
loaded_at_field=loaded_at_field,
loaded_at_query=loaded_at_query,
freshness=freshness,
quoting=quoting,
resource_type=NodeType.Source,
Expand Down
20 changes: 18 additions & 2 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
PartialSourceFreshnessResult,
SourceFreshnessResult,
)
from dbt.clients import jinja
from dbt.context.providers import RuntimeProvider, SourceContext
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import HookNode, SourceDefinition
from dbt.contracts.results import RunStatus
Expand Down Expand Up @@ -114,7 +116,22 @@ def execute(self, compiled_node, manifest):
adapter_response: Optional[AdapterResponse] = None
freshness: Optional[FreshnessResponse] = None

if compiled_node.loaded_at_field is not None:
if compiled_node.loaded_at_query is not None:
# within the context user can have access to `this`, `source_node`(`model` will point to the same thing), etc
compiled_code = jinja.get_rendered(

Check warning on line 121 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L121

Added line #L121 was not covered by tests
compiled_node.loaded_at_query,
SourceContext(
compiled_node, self.config, manifest, RuntimeProvider(), None
).to_dict(),
compiled_node,
)
adapter_response, freshness = self.adapter.calculate_freshness_from_custom_sql(

Check warning on line 128 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L128

Added line #L128 was not covered by tests
relation,
compiled_code,
macro_resolver=manifest,
)
status = compiled_node.freshness.status(freshness["age"])

Check warning on line 133 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L133

Added line #L133 was not covered by tests
elif compiled_node.loaded_at_field is not None:
adapter_response, freshness = self.adapter.calculate_freshness(
relation,
compiled_node.loaded_at_field,
Expand Down Expand Up @@ -146,7 +163,6 @@ def execute(self, compiled_node, manifest):
raise DbtRuntimeError(
f"Could not compute freshness for source {compiled_node.name}: no 'loaded_at_field' provided and {self.adapter.type()} adapter does not support metadata-based freshness checks."
)

# adapter_response was not returned in previous versions, so this will be None
# we cannot call to_dict() on NoneType
if adapter_response:
Expand Down
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
git+https://github.com/dbt-labs/dbt-adapters.git@main
git+https://github.com/dbt-labs/dbt-adapters.git@cl/custom_freshness_sql
git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-common.git@main
git+https://github.com/dbt-labs/dbt-postgres.git@main
Expand Down
Loading

0 comments on commit 8780b8c

Please sign in to comment.