Skip to content

Commit

Permalink
Recognize when loaded_at_field is explicitly set to null (#10065)
Browse files Browse the repository at this point in the history
* add support for explicit nulls for loaded_at_field

* add test

* changelog

* add parsing for tests

* centralize logic a bit

* account for sources being None

* fix bug

* remove new field from SourceDefinition

* add validation for empty string, mroe tests
  • Loading branch information
emmyoop authored May 2, 2024
1 parent 487a532 commit 1bcef62
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 3 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240429-114610.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Support overriding source level loaded_at_field with a null table level definition
time: 2024-04-29T11:46:10.100373-05:00
custom:
Author: emmyoop
Issue: "9320"
15 changes: 15 additions & 0 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ class UnparsedMacroUpdate(HasConfig, HasColumnProps, HasYamlMetadata):
class UnparsedSourceTableDefinition(HasColumnTests, HasColumnAndTestProps):
config: Dict[str, Any] = field(default_factory=dict)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
identifier: Optional[str] = None
quoting: Quoting = field(default_factory=Quoting)
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
Expand All @@ -293,10 +294,22 @@ class UnparsedSourceDefinition(dbtClassMixin):
quoting: Quoting = field(default_factory=Quoting)
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
tables: List[UnparsedSourceTableDefinition] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
config: Dict[str, Any] = field(default_factory=dict)

@classmethod
def validate(cls, data):
super(UnparsedSourceDefinition, cls).validate(data)

if data.get("loaded_at_field", None) == "":
raise ValidationError("loaded_at_field cannot be an empty string.")
if "tables" in data:
for table in data["tables"]:
if table.get("loaded_at_field", None) == "":
raise ValidationError("loaded_at_field cannot be an empty string.")

@property
def yaml_key(self) -> "str":
return "sources"
Expand All @@ -316,6 +329,7 @@ class SourceTablePatch(dbtClassMixin):
data_type: Optional[str] = None
docs: Optional[Docs] = None
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
identifier: Optional[str] = None
quoting: Quoting = field(default_factory=Quoting)
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
Expand Down Expand Up @@ -358,6 +372,7 @@ class SourcePatch(dbtClassMixin):
quoting: Optional[Quoting] = None
freshness: Optional[Optional[FreshnessThreshold]] = field(default_factory=FreshnessThreshold)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
tables: Optional[List[SourceTablePatch]] = None
tags: Optional[List[str]] = None

Expand Down
17 changes: 15 additions & 2 deletions core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,24 @@
# ===============================================================================


def yaml_from_file(source_file: SchemaSourceFile) -> Dict[str, Any]:
def yaml_from_file(source_file: SchemaSourceFile) -> Optional[Dict[str, Any]]:
"""If loading the yaml fails, raise an exception."""
try:
# source_file.contents can sometimes be None
return load_yaml_text(source_file.contents or "", source_file.path)
contents = load_yaml_text(source_file.contents or "", source_file.path)

if contents is None:
return contents

# When loaded_loaded_at_field is defined as None or null, it shows up in
# the dict but when it is not defined, it does not show up in the dict
# We need to capture this to be able to override source level settings later.
for source in contents.get("sources", []):
for table in source.get("tables", []):
if "loaded_at_field" in table:
table["loaded_at_field_present"] = True

return contents
except DbtValidationError as e:
raise YamlLoadError(
project_name=source_file.project_name, path=source_file.path.relative_path, exc=e
Expand Down
9 changes: 8 additions & 1 deletion core/dbt/parser/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition:
unique_id = target.unique_id
description = table.description or ""
source_description = source.description or ""
loaded_at_field = table.loaded_at_field or source.loaded_at_field

# We need to be able to tell the difference between explicitly setting the loaded_at_field to None/null
# and when it's simply not set. This allows a user to override the source level loaded_at_field so that
# specific table can default to metadata-based freshness.
if table.loaded_at_field_present or table.loaded_at_field is not None:
loaded_at_field = table.loaded_at_field
else:
loaded_at_field = source.loaded_at_field # may be None, that's okay

freshness = merge_freshness(source.freshness, table.freshness)
quoting = source.quoting.merged(table.quoting)
Expand Down
136 changes: 136 additions & 0 deletions tests/functional/sources/test_source_loaded_at_field.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import pytest
from dbt.tests.util import run_dbt, get_manifest, write_file
from dbt.exceptions import YamlParseDictError


loaded_at_field_null_schema_yml = """
sources:
- name: test_source
freshness:
warn_after:
count: 1
period: day
error_after:
count: 4
period: day
loaded_at_field: updated_at
tables:
- name: table1
loaded_at_field: null
"""

loaded_at_field_blank_schema_yml = """
sources:
- name: test_source
freshness:
warn_after:
count: 1
period: day
error_after:
count: 4
period: day
loaded_at_field: updated_at
tables:
- name: table1
loaded_at_field: null
"""

loaded_at_field_missing_schema_yml = """
sources:
- name: test_source
freshness:
warn_after:
count: 1
period: day
error_after:
count: 4
period: day
loaded_at_field: updated_at
tables:
- name: table1
"""

loaded_at_field_defined_schema_yml = """
sources:
- name: test_source
freshness:
warn_after:
count: 1
period: day
error_after:
count: 4
period: day
loaded_at_field: updated_at
tables:
- name: table1
loaded_at_field: updated_at_another_place
"""

loaded_at_field_empty_string_schema_yml = """
sources:
- name: test_source
freshness:
warn_after:
count: 1
period: day
error_after:
count: 4
period: day
loaded_at_field: updated_at
tables:
- name: table1
loaded_at_field: ""
"""


class TestParsingLoadedAtField:
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": loaded_at_field_null_schema_yml}

def test_loaded_at_field(self, project):
# test setting loaded_at_field to null explicitly at table level
run_dbt(["parse"])
manifest = get_manifest(project.project_root)

assert "source.test.test_source.table1" in manifest.sources
assert manifest.sources.get("source.test.test_source.table1").loaded_at_field is None

# test setting loaded_at_field at source level, do not set at table level
# end up with source level loaded_at_field
write_file(
loaded_at_field_missing_schema_yml, project.project_root, "models", "schema.yml"
)
run_dbt(["parse"])
manifest = get_manifest(project.project_root)
assert "source.test.test_source.table1" in manifest.sources
assert (
manifest.sources.get("source.test.test_source.table1").loaded_at_field == "updated_at"
)

# test setting loaded_at_field to nothing, should override Source value for None
write_file(loaded_at_field_blank_schema_yml, project.project_root, "models", "schema.yml")
run_dbt(["parse"])
manifest = get_manifest(project.project_root)

assert "source.test.test_source.table1" in manifest.sources
assert manifest.sources.get("source.test.test_source.table1").loaded_at_field is None

# test setting loaded_at_field at table level to a value - it should override source level
write_file(
loaded_at_field_defined_schema_yml, project.project_root, "models", "schema.yml"
)
run_dbt(["parse"])
manifest = get_manifest(project.project_root)
assert "source.test.test_source.table1" in manifest.sources
assert (
manifest.sources.get("source.test.test_source.table1").loaded_at_field
== "updated_at_another_place"
)

# test setting loaded_at_field at table level to an empty string - should error
write_file(
loaded_at_field_empty_string_schema_yml, project.project_root, "models", "schema.yml"
)
with pytest.raises(YamlParseDictError):
run_dbt(["parse"])

0 comments on commit 1bcef62

Please sign in to comment.