diff --git a/.changes/unreleased/Fixes-20240429-114610.yaml b/.changes/unreleased/Fixes-20240429-114610.yaml new file mode 100644 index 00000000000..97e377a0216 --- /dev/null +++ b/.changes/unreleased/Fixes-20240429-114610.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Support overriding source level loaded_at_field with a null table level definition +time: 2024-04-29T11:46:10.100373-05:00 +custom: + Author: emmyoop + Issue: "9320" diff --git a/core/dbt/contracts/graph/unparsed.py b/core/dbt/contracts/graph/unparsed.py index caeaa5cee85..ef2012221b5 100644 --- a/core/dbt/contracts/graph/unparsed.py +++ b/core/dbt/contracts/graph/unparsed.py @@ -269,6 +269,7 @@ class UnparsedMacroUpdate(HasConfig, HasColumnProps, HasYamlMetadata): class UnparsedSourceTableDefinition(HasColumnTests, HasColumnAndTestProps): config: Dict[str, Any] = field(default_factory=dict) loaded_at_field: Optional[str] = None + loaded_at_field_present: Optional[bool] = None identifier: Optional[str] = None quoting: Quoting = field(default_factory=Quoting) freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold) @@ -293,10 +294,22 @@ class UnparsedSourceDefinition(dbtClassMixin): quoting: Quoting = field(default_factory=Quoting) freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold) loaded_at_field: Optional[str] = None + loaded_at_field_present: Optional[bool] = None tables: List[UnparsedSourceTableDefinition] = field(default_factory=list) tags: List[str] = field(default_factory=list) config: Dict[str, Any] = field(default_factory=dict) + @classmethod + def validate(cls, data): + super(UnparsedSourceDefinition, cls).validate(data) + + if data.get("loaded_at_field", None) == "": + raise ValidationError("loaded_at_field cannot be an empty string.") + if "tables" in data: + for table in data["tables"]: + if table.get("loaded_at_field", None) == "": + raise ValidationError("loaded_at_field cannot be an empty string.") + @property def yaml_key(self) -> "str": return "sources" @@ -316,6 +329,7 @@ class SourceTablePatch(dbtClassMixin): data_type: Optional[str] = None docs: Optional[Docs] = None loaded_at_field: Optional[str] = None + loaded_at_field_present: Optional[bool] = None identifier: Optional[str] = None quoting: Quoting = field(default_factory=Quoting) freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold) @@ -358,6 +372,7 @@ class SourcePatch(dbtClassMixin): quoting: Optional[Quoting] = None freshness: Optional[Optional[FreshnessThreshold]] = field(default_factory=FreshnessThreshold) loaded_at_field: Optional[str] = None + loaded_at_field_present: Optional[bool] = None tables: Optional[List[SourceTablePatch]] = None tags: Optional[List[str]] = None diff --git a/core/dbt/parser/schemas.py b/core/dbt/parser/schemas.py index 838939b83fc..68fafba85f5 100644 --- a/core/dbt/parser/schemas.py +++ b/core/dbt/parser/schemas.py @@ -111,11 +111,24 @@ # =============================================================================== -def yaml_from_file(source_file: SchemaSourceFile) -> Dict[str, Any]: +def yaml_from_file(source_file: SchemaSourceFile) -> Optional[Dict[str, Any]]: """If loading the yaml fails, raise an exception.""" try: # source_file.contents can sometimes be None - return load_yaml_text(source_file.contents or "", source_file.path) + contents = load_yaml_text(source_file.contents or "", source_file.path) + + if contents is None: + return contents + + # When loaded_loaded_at_field is defined as None or null, it shows up in + # the dict but when it is not defined, it does not show up in the dict + # We need to capture this to be able to override source level settings later. + for source in contents.get("sources", []): + for table in source.get("tables", []): + if "loaded_at_field" in table: + table["loaded_at_field_present"] = True + + return contents except DbtValidationError as e: raise YamlLoadError( project_name=source_file.project_name, path=source_file.path.relative_path, exc=e diff --git a/core/dbt/parser/sources.py b/core/dbt/parser/sources.py index 1f57efe79ce..f94262aac03 100644 --- a/core/dbt/parser/sources.py +++ b/core/dbt/parser/sources.py @@ -133,7 +133,14 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition: unique_id = target.unique_id description = table.description or "" source_description = source.description or "" - loaded_at_field = table.loaded_at_field or source.loaded_at_field + + # We need to be able to tell the difference between explicitly setting the loaded_at_field to None/null + # and when it's simply not set. This allows a user to override the source level loaded_at_field so that + # specific table can default to metadata-based freshness. + if table.loaded_at_field_present or table.loaded_at_field is not None: + loaded_at_field = table.loaded_at_field + else: + loaded_at_field = source.loaded_at_field # may be None, that's okay freshness = merge_freshness(source.freshness, table.freshness) quoting = source.quoting.merged(table.quoting) diff --git a/tests/functional/sources/test_source_loaded_at_field.py b/tests/functional/sources/test_source_loaded_at_field.py new file mode 100644 index 00000000000..bc5e5fc05bc --- /dev/null +++ b/tests/functional/sources/test_source_loaded_at_field.py @@ -0,0 +1,136 @@ +import pytest +from dbt.tests.util import run_dbt, get_manifest, write_file +from dbt.exceptions import YamlParseDictError + + +loaded_at_field_null_schema_yml = """ +sources: + - name: test_source + freshness: + warn_after: + count: 1 + period: day + error_after: + count: 4 + period: day + loaded_at_field: updated_at + tables: + - name: table1 + loaded_at_field: null +""" + +loaded_at_field_blank_schema_yml = """ +sources: + - name: test_source + freshness: + warn_after: + count: 1 + period: day + error_after: + count: 4 + period: day + loaded_at_field: updated_at + tables: + - name: table1 + loaded_at_field: null +""" + +loaded_at_field_missing_schema_yml = """ +sources: + - name: test_source + freshness: + warn_after: + count: 1 + period: day + error_after: + count: 4 + period: day + loaded_at_field: updated_at + tables: + - name: table1 +""" + +loaded_at_field_defined_schema_yml = """ +sources: + - name: test_source + freshness: + warn_after: + count: 1 + period: day + error_after: + count: 4 + period: day + loaded_at_field: updated_at + tables: + - name: table1 + loaded_at_field: updated_at_another_place +""" + +loaded_at_field_empty_string_schema_yml = """ +sources: + - name: test_source + freshness: + warn_after: + count: 1 + period: day + error_after: + count: 4 + period: day + loaded_at_field: updated_at + tables: + - name: table1 + loaded_at_field: "" +""" + + +class TestParsingLoadedAtField: + @pytest.fixture(scope="class") + def models(self): + return {"schema.yml": loaded_at_field_null_schema_yml} + + def test_loaded_at_field(self, project): + # test setting loaded_at_field to null explicitly at table level + run_dbt(["parse"]) + manifest = get_manifest(project.project_root) + + assert "source.test.test_source.table1" in manifest.sources + assert manifest.sources.get("source.test.test_source.table1").loaded_at_field is None + + # test setting loaded_at_field at source level, do not set at table level + # end up with source level loaded_at_field + write_file( + loaded_at_field_missing_schema_yml, project.project_root, "models", "schema.yml" + ) + run_dbt(["parse"]) + manifest = get_manifest(project.project_root) + assert "source.test.test_source.table1" in manifest.sources + assert ( + manifest.sources.get("source.test.test_source.table1").loaded_at_field == "updated_at" + ) + + # test setting loaded_at_field to nothing, should override Source value for None + write_file(loaded_at_field_blank_schema_yml, project.project_root, "models", "schema.yml") + run_dbt(["parse"]) + manifest = get_manifest(project.project_root) + + assert "source.test.test_source.table1" in manifest.sources + assert manifest.sources.get("source.test.test_source.table1").loaded_at_field is None + + # test setting loaded_at_field at table level to a value - it should override source level + write_file( + loaded_at_field_defined_schema_yml, project.project_root, "models", "schema.yml" + ) + run_dbt(["parse"]) + manifest = get_manifest(project.project_root) + assert "source.test.test_source.table1" in manifest.sources + assert ( + manifest.sources.get("source.test.test_source.table1").loaded_at_field + == "updated_at_another_place" + ) + + # test setting loaded_at_field at table level to an empty string - should error + write_file( + loaded_at_field_empty_string_schema_yml, project.project_root, "models", "schema.yml" + ) + with pytest.raises(YamlParseDictError): + run_dbt(["parse"])