diff --git a/.changes/1.10.0.md b/.changes/1.10.0.md new file mode 100644 index 00000000..10fc93bc --- /dev/null +++ b/.changes/1.10.0.md @@ -0,0 +1 @@ +## dbt-adapters 1.10.0 - September 12, 2024 diff --git a/.changes/1.10.1.md b/.changes/1.10.1.md new file mode 100644 index 00000000..01d6fda2 --- /dev/null +++ b/.changes/1.10.1.md @@ -0,0 +1 @@ +## dbt-adapters 1.10.1 - September 16, 2024 diff --git a/.changes/1.4.1.md b/.changes/1.4.1.md new file mode 100644 index 00000000..82a731a2 --- /dev/null +++ b/.changes/1.4.1.md @@ -0,0 +1,13 @@ +## dbt-adapters 1.4.1 - August 09, 2024 + +### Fixes + +- Use model alias for the CTE identifier generated during ephemeral materialization ([#5273](https://github.com/dbt-labs/dbt-adapters/issues/5273)) + +### Under the Hood + +- Updating changie.yaml to add contributors and PR links ([#219](https://github.com/dbt-labs/dbt-adapters/issues/219)) + +### Contributors +- [@jeancochrane](https://github.com/jeancochrane) ([#5273](https://github.com/dbt-labs/dbt-adapters/issues/5273)) +- [@leahwicz](https://github.com/leahwicz) ([#219](https://github.com/dbt-labs/dbt-adapters/issues/219)) diff --git a/.changes/1.5.0.md b/.changes/1.5.0.md new file mode 100644 index 00000000..e4c54bdc --- /dev/null +++ b/.changes/1.5.0.md @@ -0,0 +1,11 @@ +## dbt-adapters 1.5.0 - September 10, 2024 + +### Features + +- Compare 'snapshot_get_time' and snapshot 'updated_at' data types ([#242](https://github.com/dbt-labs/dbt-adapters/issues/242)) +- Add Behavior Flag framework ([#281](https://github.com/dbt-labs/dbt-adapters/issues/281)) +- Add EventTimeFilter to BaseRelation, which renders a filtered relation when start or end is set ([#294](https://github.com/dbt-labs/dbt-adapters/issues/294)) + +### Dependencies + +- Update dbt-common pin to >=1.8 ([#299](https://github.com/dbt-labs/dbt-adapters/pull/299)) diff --git a/.changes/1.6.0.md b/.changes/1.6.0.md new file mode 100644 index 00000000..c109454a --- /dev/null +++ b/.changes/1.6.0.md @@ -0,0 +1,5 @@ +## dbt-adapters 1.6.0 - September 12, 2024 + +### Features + +- Default microbatch strategy implementation and base tests ([#302](https://github.com/dbt-labs/dbt-adapters/issues/302)) diff --git a/.changes/1.6.1.md b/.changes/1.6.1.md new file mode 100644 index 00000000..45b41bcd --- /dev/null +++ b/.changes/1.6.1.md @@ -0,0 +1 @@ +## dbt-adapters 1.6.1 - September 16, 2024 diff --git a/.changes/1.7.0.md b/.changes/1.7.0.md new file mode 100644 index 00000000..efbdc601 --- /dev/null +++ b/.changes/1.7.0.md @@ -0,0 +1,5 @@ +## dbt-adapters 1.7.0 - September 19, 2024 + +### Features + +- Allow configuring of snapshot column names ([#289](https://github.com/dbt-labs/dbt-adapters/issues/289)) diff --git a/.changes/unreleased/Fixes-20240610-195300.yaml b/.changes/unreleased/Fixes-20240610-195300.yaml deleted file mode 100644 index 1f8cd5a5..00000000 --- a/.changes/unreleased/Fixes-20240610-195300.yaml +++ /dev/null @@ -1,6 +0,0 @@ -kind: Fixes -body: Use model alias for the CTE identifier generated during ephemeral materialization -time: 2024-06-10T19:53:00.086488231Z -custom: - Author: jeancochrane - Issue: "5273" diff --git a/.changes/unreleased/Under the Hood-20240801-220551.yaml b/.changes/unreleased/Under the Hood-20240801-220551.yaml deleted file mode 100644 index 25b54a65..00000000 --- a/.changes/unreleased/Under the Hood-20240801-220551.yaml +++ /dev/null @@ -1,6 +0,0 @@ -kind: Under the Hood -body: Updating changie.yaml to add contributors and PR links -time: 2024-08-01T22:05:51.327652-04:00 -custom: - Author: leahwicz - Issue: "219" diff --git a/.changes/unreleased/Under the Hood-20240923-184719.yaml b/.changes/unreleased/Under the Hood-20240923-184719.yaml new file mode 100644 index 00000000..35e66b90 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20240923-184719.yaml @@ -0,0 +1,7 @@ +kind: Under the Hood +body: 'dbt-tests-adapters: Add required begin to microbatch model config to BaseMicrobatch + test' +time: 2024-09-23T18:47:19.171618+01:00 +custom: + Author: michelleark + Issue: "315" diff --git a/.github/workflows/precommit-autoupdate.yml b/.github/workflows/precommit-autoupdate.yml new file mode 100644 index 00000000..74976c48 --- /dev/null +++ b/.github/workflows/precommit-autoupdate.yml @@ -0,0 +1,22 @@ +name: "Run pre-commit autoupdate" + +on: + schedule: + - cron: "30 1 * * SAT" + workflow_dispatch: + +permissions: + contents: write + +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.sha }} + cancel-in-progress: true + +jobs: + precommit-autoupdate: + name: "Run pre-commit autoupdate" + uses: dbt-labs/actions/.github/workflows/pre-commit-autoupdate.yml + secrets: + TOKEN: ${{ secrets.FISHTOWN_BOT_PAT }} + SLACK_WEBHOOK_PR_URL: ${{ secrets.SLACK_DEV_ADAPTER_PULL_REQUESTS }} + SLACK_WEBHOOK_ALERTS_URL: ${{ secrets.SLACK_DEV_ADAPTER_ALERTS }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index caf34209..0f2a03f7 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -38,7 +38,7 @@ repos: - --max-line-length=99 - --select=E,F,W - --ignore=E203,E501,E704,E741,W503,W504 - - --per-file-ignores=*/__init__.py:F401 + - --per-file-ignores=*/__init__.py:F401,*/conftest.py:F401 - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.9.0 diff --git a/CHANGELOG.md b/CHANGELOG.md index 4146e95e..1287c432 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,52 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html), and is generated by [Changie](https://github.com/miniscruff/changie). +## dbt-adapters 1.10.1 - September 16, 2024 + +## dbt-adapters 1.10.0 - September 12, 2024 + +## dbt-adapters 1.7.0 - September 19, 2024 + +### Features + +- Allow configuring of snapshot column names ([#289](https://github.com/dbt-labs/dbt-adapters/issues/289)) + + + +## dbt-adapters 1.6.1 - September 16, 2024 + +## dbt-adapters 1.6.0 - September 12, 2024 + +### Features + +- Default microbatch strategy implementation and base tests ([#302](https://github.com/dbt-labs/dbt-adapters/issues/302)) + +## dbt-adapters 1.5.0 - September 10, 2024 + +### Features + +- Compare 'snapshot_get_time' and snapshot 'updated_at' data types ([#242](https://github.com/dbt-labs/dbt-adapters/issues/242)) +- Add Behavior Flag framework ([#281](https://github.com/dbt-labs/dbt-adapters/issues/281)) +- Add EventTimeFilter to BaseRelation, which renders a filtered relation when start or end is set ([#294](https://github.com/dbt-labs/dbt-adapters/issues/294)) + +### Dependencies + +- Update dbt-common pin to >=1.8 ([#299](https://github.com/dbt-labs/dbt-adapters/pull/299)) + +## dbt-adapters 1.4.1 - August 09, 2024 + +### Fixes + +- Use model alias for the CTE identifier generated during ephemeral materialization ([#5273](https://github.com/dbt-labs/dbt-adapters/issues/5273)) + +### Under the Hood + +- Updating changie.yaml to add contributors and PR links ([#219](https://github.com/dbt-labs/dbt-adapters/issues/219)) + +### Contributors +- [@jeancochrane](https://github.com/jeancochrane) ([#5273](https://github.com/dbt-labs/dbt-adapters/issues/5273)) +- [@leahwicz](https://github.com/leahwicz) ([#219](https://github.com/dbt-labs/dbt-adapters/issues/219)) + ## dbt-adapters 1.4.0 - July 30, 2024 ### Features diff --git a/dbt-tests-adapter/dbt/tests/__about__.py b/dbt-tests-adapter/dbt/tests/__about__.py index 1b022739..8c5bf7a0 100644 --- a/dbt-tests-adapter/dbt/tests/__about__.py +++ b/dbt-tests-adapter/dbt/tests/__about__.py @@ -1 +1 @@ -version = "1.9.2" +version = "1.10.1" diff --git a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py new file mode 100644 index 00000000..5bbabbe1 --- /dev/null +++ b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py @@ -0,0 +1,96 @@ +import os +from pprint import pformat +from unittest import mock + +import pytest + +from dbt.tests.util import relation_from_name, run_dbt + +try: + # patch_microbatch_end_time introduced in dbt 1.9.0 + from dbt.tests.util import patch_microbatch_end_time +except ImportError: + from freezegun import freeze_time as patch_microbatch_end_time + +_input_model_sql = """ +{{ config(materialized='table', event_time='event_time') }} +select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time +union all +select 2 as id, TIMESTAMP '2020-01-02 00:00:00-0' as event_time +union all +select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time +""" + +_microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} +select * from {{ ref('input_model') }} +""" + + +class BaseMicrobatch: + @pytest.fixture(scope="class") + def microbatch_model_sql(self) -> str: + """ + This is the SQL that defines the microbatch model, including any {{ config(..) }} + """ + return _microbatch_model_sql + + @pytest.fixture(scope="class") + def input_model_sql(self) -> str: + """ + This is the SQL that defines the input model to the microbatch model, including any {{ config(..) }}. + event_time is a required configuration of this input + """ + return _input_model_sql + + @pytest.fixture(scope="class") + def insert_two_rows_sql(self, project) -> str: + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00-0'), (5, TIMESTAMP '2020-01-05 00:00:00-0')" + + @pytest.fixture(scope="class") + def models(self, microbatch_model_sql, input_model_sql): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_sql, + } + + def assert_row_count(self, project, relation_name: str, expected_row_count: int): + relation = relation_from_name(project.adapter, relation_name) + result = project.run_sql(f"select * from {relation}", fetch="all") + + assert len(result) == expected_row_count, f"{relation_name}:{pformat(result)}" + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_run_with_event_time(self, project, insert_two_rows_sql): + # initial run -- backfills all data + with patch_microbatch_end_time("2020-01-03 13:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # our partition grain is "day" so running the same day without new data should produce the same results + with patch_microbatch_end_time("2020-01-03 14:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # add next two days of data + project.run_sql(insert_two_rows_sql) + + self.assert_row_count(project, "input_model", 5) + + # re-run without changing current time => no insert + with patch_microbatch_end_time("2020-01-03 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 3) + + # re-run by advancing time by one day changing current time => insert 1 row + with patch_microbatch_end_time("2020-01-04 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 4) + + # re-run by advancing time by one more day changing current time => insert 1 more row + with patch_microbatch_end_time("2020-01-05 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 5) diff --git a/dbt-tests-adapter/pyproject.toml b/dbt-tests-adapter/pyproject.toml index 73bce49e..c9082d43 100644 --- a/dbt-tests-adapter/pyproject.toml +++ b/dbt-tests-adapter/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ # `dbt-core` takes the packages below as dependencies, so they are unpinned to avoid conflicts "dbt-adapters", "pyyaml", + "freezegun", ] [project.urls] Homepage = "https://github.com/dbt-labs/dbt-adapters" diff --git a/dbt/adapters/__about__.py b/dbt/adapters/__about__.py index d619c757..a55413d1 100644 --- a/dbt/adapters/__about__.py +++ b/dbt/adapters/__about__.py @@ -1 +1 @@ -version = "1.4.0" +version = "1.7.0" diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index e0627c47..f3788fe3 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -22,8 +22,9 @@ Union, TYPE_CHECKING, ) - +import os import pytz +from dbt_common.behavior_flags import Behavior, BehaviorFlag from dbt_common.clients.jinja import CallableMacroGenerator from dbt_common.contracts.constraints import ( ColumnLevelConstraint, @@ -54,7 +55,7 @@ BaseConnectionManager, Connection, ) -from dbt.adapters.base.meta import AdapterMeta, available +from dbt.adapters.base.meta import AdapterMeta, available, available_property from dbt.adapters.base.relation import ( BaseRelation, ComponentName, @@ -82,7 +83,6 @@ QuoteConfigTypeError, RelationReturnedMultipleResultsError, RenameToNoneAttemptedError, - SnapshotTargetIncompleteError, SnapshotTargetNotSnapshotTableError, UnexpectedNonTimestampError, ) @@ -261,7 +261,7 @@ class BaseAdapter(metaclass=AdapterMeta): MAX_SCHEMA_METADATA_RELATIONS = 100 - # This static member variable can be overriden in concrete adapter + # This static member variable can be overridden in concrete adapter # implementations to indicate adapter support for optional capabilities. _capabilities = CapabilityDict({}) @@ -271,6 +271,8 @@ def __init__(self, config, mp_context: SpawnContext) -> None: self.connections = self.ConnectionManager(config, mp_context) self._macro_resolver: Optional[MacroResolverProtocol] = None self._macro_context_generator: Optional[MacroContextGeneratorCallable] = None + # this will be updated to include global behavior flags once they exist + self.behavior = [] # type: ignore ### # Methods to set / access a macro resolver @@ -291,6 +293,27 @@ def set_macro_context_generator( ) -> None: self._macro_context_generator = macro_context_generator + @available_property + def behavior(self) -> Behavior: + return self._behavior + + @behavior.setter # type: ignore + def behavior(self, flags: List[BehaviorFlag]) -> None: + flags.extend(self._behavior_flags) + try: + # we don't always get project flags, for example during `dbt debug` + self._behavior = Behavior(flags, self.config.flags) + except AttributeError: + # in that case, don't load any behavior to avoid unexpected defaults + self._behavior = Behavior([], {}) + + @property + def _behavior_flags(self) -> List[BehaviorFlag]: + """ + This method should be overwritten by adapter maintainers to provide platform-specific flags + """ + return [] + ### # Methods that pass through to the connection manager ### @@ -740,7 +763,9 @@ def get_missing_columns( return [col for (col_name, col) in from_columns.items() if col_name in missing_columns] @available.parse_none - def valid_snapshot_target(self, relation: BaseRelation) -> None: + def valid_snapshot_target( + self, relation: BaseRelation, column_names: Optional[Dict[str, str]] = None + ) -> None: """Ensure that the target relation is valid, by making sure it has the expected columns. @@ -758,21 +783,16 @@ def valid_snapshot_target(self, relation: BaseRelation) -> None: columns = self.get_columns_in_relation(relation) names = set(c.name.lower() for c in columns) - expanded_keys = ("scd_id", "valid_from", "valid_to") - extra = [] missing = [] - for legacy in expanded_keys: - desired = "dbt_" + legacy + # Note: we're not checking dbt_updated_at here because it's not + # always present. + for column in ("dbt_scd_id", "dbt_valid_from", "dbt_valid_to"): + desired = column_names[column] if column_names else column if desired not in names: missing.append(desired) - if legacy in names: - extra.append(legacy) if missing: - if extra: - raise SnapshotTargetIncompleteError(extra, missing) - else: - raise SnapshotTargetNotSnapshotTableError(missing) + raise SnapshotTargetNotSnapshotTableError(missing) @available.parse_none def expand_target_column_types( @@ -1549,7 +1569,11 @@ def valid_incremental_strategies(self): return ["append"] def builtin_incremental_strategies(self): - return ["append", "delete+insert", "merge", "insert_overwrite"] + builtin_strategies = ["append", "delete+insert", "merge", "insert_overwrite"] + if os.environ.get("DBT_EXPERIMENTAL_MICROBATCH"): + builtin_strategies.append("microbatch") + + return builtin_strategies @available.parse_none def get_incremental_strategy_macro(self, model_context, strategy: str): diff --git a/dbt/adapters/base/meta.py b/dbt/adapters/base/meta.py index ca7aef9f..e522a056 100644 --- a/dbt/adapters/base/meta.py +++ b/dbt/adapters/base/meta.py @@ -92,6 +92,25 @@ def parse_list(self, func: Callable) -> Callable: available = _Available() +class available_property(property): + """ + This supports making dynamic properties (`@property`) available in the jinja context. + + We use `@available` to make methods available in the jinja context, but this mechanism relies on the method being callable. + Intuitively, we should be able to use both `@available` and `@property` to create a dynamic property that's available in the jinja context. + + Using the `@property` decorator as the inner decorator supplies `@available` with something that is not callable. + Instead of returning the method, `@property` returns the value itself, not the method that is called to create the value. + + Using the `@available` decorator as the inner decorator adds `_is_available_ = True` to the function. + However, when the `@property` decorator executes, it returns a `property` object which does not have the `_is_available_` attribute. + + This decorator solves this problem by simply adding `_is_available_ = True` as an attribute on the `property` built-in. + """ + + _is_available_ = True + + class AdapterMeta(abc.ABCMeta): _available_: FrozenSet[str] _parse_replacements_: Dict[str, Callable] diff --git a/dbt/adapters/base/relation.py b/dbt/adapters/base/relation.py index 1aab7b2f..80dbd34b 100644 --- a/dbt/adapters/base/relation.py +++ b/dbt/adapters/base/relation.py @@ -1,5 +1,6 @@ from collections.abc import Hashable from dataclasses import dataclass, field +from datetime import datetime from typing import ( Any, Dict, @@ -36,6 +37,13 @@ SerializableIterable = Union[Tuple, FrozenSet] +@dataclass +class EventTimeFilter(FakeAPIObject): + field_name: str + start: Optional[datetime] = None + end: Optional[datetime] = None + + @dataclass(frozen=True, eq=False, repr=False) class BaseRelation(FakeAPIObject, Hashable): path: Path @@ -47,6 +55,7 @@ class BaseRelation(FakeAPIObject, Hashable): quote_policy: Policy = field(default_factory=lambda: Policy()) dbt_created: bool = False limit: Optional[int] = None + event_time_filter: Optional[EventTimeFilter] = None require_alias: bool = ( True # used to govern whether to add an alias when render_limited is called ) @@ -208,14 +217,19 @@ def render(self) -> str: # if there is nothing set, this will return the empty string. return ".".join(part for _, part in self._render_iterator() if part is not None) - def _render_limited_alias(self) -> str: + def _render_subquery_alias(self, namespace: str) -> str: """Some databases require an alias for subqueries (postgres, mysql) for all others we want to avoid adding an alias as it has the potential to introduce issues with the query if the user also defines an alias. """ if self.require_alias: - return f" _dbt_limit_subq_{self.table}" + return f" _dbt_{namespace}_subq_{self.table}" return "" + def _render_limited_alias( + self, + ) -> str: + return self._render_subquery_alias(namespace="limit") + def render_limited(self) -> str: rendered = self.render() if self.limit is None: @@ -225,6 +239,31 @@ def render_limited(self) -> str: else: return f"(select * from {rendered} limit {self.limit}){self._render_limited_alias()}" + def render_event_time_filtered(self, rendered: Optional[str] = None) -> str: + rendered = rendered or self.render() + if self.event_time_filter is None: + return rendered + + filter = self._render_event_time_filtered(self.event_time_filter) + if not filter: + return rendered + + return f"(select * from {rendered} where {filter}){self._render_subquery_alias(namespace='et_filter')}" + + def _render_event_time_filtered(self, event_time_filter: EventTimeFilter) -> str: + """ + Returns "" if start and end are both None + """ + filter = "" + if event_time_filter.start and event_time_filter.end: + filter = f"{event_time_filter.field_name} >= '{event_time_filter.start}' and {event_time_filter.field_name} < '{event_time_filter.end}'" + elif event_time_filter.start: + filter = f"{event_time_filter.field_name} >= '{event_time_filter.start}'" + elif event_time_filter.end: + filter = f"{event_time_filter.field_name} < '{event_time_filter.end}'" + + return filter + def quoted(self, identifier): return "{quote_char}{identifier}{quote_char}".format( quote_char=self.quote_character, @@ -240,6 +279,7 @@ def create_ephemeral_from( cls: Type[Self], relation_config: RelationConfig, limit: Optional[int] = None, + event_time_filter: Optional[EventTimeFilter] = None, ) -> Self: # Note that ephemeral models are based on the identifier, which will # point to the model's alias if one exists and otherwise fall back to @@ -250,6 +290,7 @@ def create_ephemeral_from( type=cls.CTE, identifier=identifier, limit=limit, + event_time_filter=event_time_filter, ).quote(identifier=False) @classmethod @@ -315,7 +356,14 @@ def __hash__(self) -> int: return hash(self.render()) def __str__(self) -> str: - return self.render() if self.limit is None else self.render_limited() + rendered = self.render() if self.limit is None else self.render_limited() + + # Limited subquery is wrapped by the event time filter subquery, and not the other way around. + # This is because in the context of resolving limited refs, we care more about performance than reliably producing a sample of a certain size. + if self.event_time_filter: + rendered = self.render_event_time_filtered(rendered) + + return rendered @property def database(self) -> Optional[str]: diff --git a/dbt/adapters/exceptions/compilation.py b/dbt/adapters/exceptions/compilation.py index 46ca5219..d82924e3 100644 --- a/dbt/adapters/exceptions/compilation.py +++ b/dbt/adapters/exceptions/compilation.py @@ -150,8 +150,10 @@ def __init__(self, missing: List): super().__init__(msg=self.get_message()) def get_message(self) -> str: - msg = 'Snapshot target is not a snapshot table (missing "{}")'.format( - '", "'.join(self.missing) + missing = '", "'.join(self.missing) + msg = ( + f'Snapshot target is missing configured columns (missing "{missing}"). ' + "See https://docs.getdbt.com/docs/build/snapshots#snapshot-meta-fields for more information." ) return msg diff --git a/dbt/adapters/factory.py b/dbt/adapters/factory.py index b1854f67..8a018619 100644 --- a/dbt/adapters/factory.py +++ b/dbt/adapters/factory.py @@ -188,7 +188,7 @@ def get_include_paths(self, name: Optional[str]) -> List[Path]: def get_adapter_type_names(self, name: Optional[str]) -> List[str]: return [p.adapter.type() for p in self.get_adapter_plugins(name)] - def get_adapter_constraint_support(self, name: Optional[str]) -> List[str]: + def get_adapter_constraint_support(self, name: Optional[str]) -> Dict[str, str]: return self.lookup_adapter(name).CONSTRAINT_SUPPORT # type: ignore @@ -251,7 +251,7 @@ def get_adapter_type_names(name: Optional[str]) -> List[str]: return FACTORY.get_adapter_type_names(name) -def get_adapter_constraint_support(name: Optional[str]) -> List[str]: +def get_adapter_constraint_support(name: Optional[str]) -> Dict[str, str]: return FACTORY.get_adapter_constraint_support(name) diff --git a/dbt/include/global_project/macros/adapters/timestamps.sql b/dbt/include/global_project/macros/adapters/timestamps.sql index 64b5fd3d..c936c844 100644 --- a/dbt/include/global_project/macros/adapters/timestamps.sql +++ b/dbt/include/global_project/macros/adapters/timestamps.sql @@ -15,6 +15,14 @@ {{ current_timestamp() }} {% endmacro %} +{% macro get_snapshot_get_time_data_type() %} + {% set snapshot_time = adapter.dispatch('snapshot_get_time', 'dbt')() %} + {% set time_data_type_sql = 'select ' ~ snapshot_time ~ ' as dbt_snapshot_time' %} + {% set snapshot_time_column_schema = get_column_schema_from_query(time_data_type_sql) %} + {% set time_data_type = snapshot_time_column_schema[0].dtype %} + {{ return(time_data_type or none) }} +{% endmacro %} + --------------------------------------------- /* {# diff --git a/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql b/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql index 72082cca..111d3887 100644 --- a/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql +++ b/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql @@ -66,6 +66,19 @@ {% endmacro %} +{% macro get_incremental_microbatch_sql(arg_dict) %} + + {{ return(adapter.dispatch('get_incremental_microbatch_sql', 'dbt')(arg_dict)) }} + +{% endmacro %} + +{% macro default__get_incremental_microbatch_sql(arg_dict) %} + + {{ exceptions.raise_not_implemented('microbatch materialization strategy not implemented for adapter ' + adapter.type()) }} + +{% endmacro %} + + {% macro get_insert_into_sql(target_relation, temp_relation, dest_columns) %} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index bb71974c..8d982855 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -34,7 +34,12 @@ {{ adapter.dispatch('snapshot_staging_table', 'dbt')(strategy, source_sql, target_relation) }} {% endmacro %} +{% macro get_snapshot_table_column_names() %} + {{ return({'dbt_valid_to': 'dbt_valid_to', 'dbt_valid_from': 'dbt_valid_from', 'dbt_scd_id': 'dbt_scd_id', 'dbt_updated_at': 'dbt_updated_at'}) }} +{% endmacro %} + {% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%} + {% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %} with snapshot_query as ( @@ -48,7 +53,7 @@ {{ strategy.unique_key }} as dbt_unique_key from {{ target_relation }} - where dbt_valid_to is null + where {{ columns.dbt_valid_to }} is null ), @@ -57,10 +62,10 @@ select *, {{ strategy.unique_key }} as dbt_unique_key, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to, - {{ strategy.scd_id }} as dbt_scd_id + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as {{ columns.dbt_valid_to }}, + {{ strategy.scd_id }} as {{ columns.dbt_scd_id }} from snapshot_query ), @@ -70,9 +75,9 @@ select *, {{ strategy.unique_key }} as dbt_unique_key, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - {{ strategy.updated_at }} as dbt_valid_to + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_to }} from snapshot_query ), @@ -111,7 +116,7 @@ select 'update' as dbt_change_type, source_data.*, - snapshotted_data.dbt_scd_id + snapshotted_data.{{ columns.dbt_scd_id }} from updates_source_data as source_data join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key @@ -128,10 +133,10 @@ select 'delete' as dbt_change_type, source_data.*, - {{ snapshot_get_time() }} as dbt_valid_from, - {{ snapshot_get_time() }} as dbt_updated_at, - {{ snapshot_get_time() }} as dbt_valid_to, - snapshotted_data.dbt_scd_id + {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, + {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, + {{ snapshot_get_time() }} as {{ columns.dbt_valid_to }}, + snapshotted_data.{{ columns.dbt_scd_id }} from snapshotted_data left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key @@ -155,12 +160,13 @@ {% endmacro %} {% macro default__build_snapshot_table(strategy, sql) %} + {% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %} select *, - {{ strategy.scd_id }} as dbt_scd_id, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to + {{ strategy.scd_id }} as {{ columns.dbt_scd_id }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as {{ columns.dbt_valid_to }} from ( {{ sql }} ) sbq @@ -179,3 +185,28 @@ {% do return(temp_relation) %} {% endmacro %} + + +{% macro get_updated_at_column_data_type(snapshot_sql) %} + {% set snapshot_sql_column_schema = get_column_schema_from_query(snapshot_sql) %} + {% set dbt_updated_at_data_type = null %} + {% set ns = namespace() -%} {#-- handle for-loop scoping with a namespace --#} + {% set ns.dbt_updated_at_data_type = null -%} + {% for column in snapshot_sql_column_schema %} + {% if ((column.column == 'dbt_updated_at') or (column.column == 'DBT_UPDATED_AT')) %} + {% set ns.dbt_updated_at_data_type = column.dtype %} + {% endif %} + {% endfor %} + {{ return(ns.dbt_updated_at_data_type or none) }} +{% endmacro %} + + +{% macro check_time_data_types(sql) %} + {% set dbt_updated_at_data_type = get_updated_at_column_data_type(sql) %} + {% set snapshot_get_time_data_type = get_snapshot_get_time_data_type() %} + {% if snapshot_get_time_data_type is not none and dbt_updated_at_data_type is not none and snapshot_get_time_data_type != dbt_updated_at_data_type %} + {% if exceptions.warn_snapshot_timestamp_data_types %} + {{ exceptions.warn_snapshot_timestamp_data_types(snapshot_get_time_data_type, dbt_updated_at_data_type) }} + {% endif %} + {% endif %} +{% endmacro %} diff --git a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql index b0fe9222..5daead4c 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql @@ -1,5 +1,4 @@ {% materialization snapshot, default %} - {%- set config = model['config'] -%} {%- set target_table = model.get('alias', model.get('name')) -%} @@ -24,17 +23,23 @@ {{ run_hooks(pre_hooks, inside_transaction=True) }} {% set strategy_macro = strategy_dispatch(strategy_name) %} - {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %} + {# The model['config'] parameter below is no longer used, but passing anyway for compatibility #} + {# It was a dictionary of config, instead of the config object from the context #} + {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", model['config'], target_relation_exists) %} {% if not target_relation_exists %} {% set build_sql = build_snapshot_table(strategy, model['compiled_code']) %} + {% set build_or_select_sql = build_sql %} {% set final_sql = create_table_as(False, target_relation, build_sql) %} {% else %} - {{ adapter.valid_snapshot_target(target_relation) }} + {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} + {{ adapter.valid_snapshot_target(target_relation, columns) }} + + {% set build_or_select_sql = snapshot_staging_table(strategy, sql, target_relation) %} {% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %} -- this may no-op if the database does not require column expansion @@ -71,6 +76,9 @@ {% endif %} + + {{ check_time_data_types(build_or_select_sql) }} + {% call statement('main') %} {{ final_sql }} {% endcall %} diff --git a/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql b/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql index 56798811..74494ed2 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql @@ -7,15 +7,17 @@ {% macro default__snapshot_merge_sql(target, source, insert_cols) -%} {%- set insert_cols_csv = insert_cols | join(', ') -%} + {%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%} + merge into {{ target.render() }} as DBT_INTERNAL_DEST using {{ source }} as DBT_INTERNAL_SOURCE - on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id + on DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = DBT_INTERNAL_DEST.{{ columns.dbt_scd_id }} when matched - and DBT_INTERNAL_DEST.dbt_valid_to is null + and DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null and DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete') then update - set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to + set {{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }} when not matched and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert' diff --git a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql index d22cc336..8c086182 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql @@ -49,10 +49,13 @@ {# Core strategy definitions #} -{% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} - {% set primary_key = config['unique_key'] %} - {% set updated_at = config['updated_at'] %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %} + +{% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, model_config, target_exists) %} + {# The model_config parameter is no longer used, but is passed in anyway for compatibility. #} + {% set primary_key = config.get('unique_key') %} + {% set updated_at = config.get('updated_at') %} + {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %} + {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} {#/* The snapshot relation might not have an {{ updated_at }} value if the @@ -64,7 +67,7 @@ See https://github.com/dbt-labs/dbt-core/issues/2350 */ #} {% set row_changed_expr -%} - ({{ snapshotted_rel }}.dbt_valid_from < {{ current_rel }}.{{ updated_at }}) + ({{ snapshotted_rel }}.{{ columns.dbt_valid_from }} < {{ current_rel }}.{{ updated_at }}) {%- endset %} {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} @@ -133,11 +136,12 @@ {%- endmacro %} -{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} - {% set check_cols_config = config['check_cols'] %} - {% set primary_key = config['unique_key'] %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %} - {% set updated_at = config.get('updated_at', snapshot_get_time()) %} +{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, model_config, target_exists) %} + {# The model_config parameter is no longer used, but is passed in anyway for compatibility. #} + {% set check_cols_config = config.get('check_cols') %} + {% set primary_key = config.get('unique_key') %} + {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %} + {% set updated_at = config.get('updated_at') or snapshot_get_time() %} {% set column_added = false %} diff --git a/pyproject.toml b/pyproject.toml index e794781c..76ca3dee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ classifiers = [ "Programming Language :: Python :: 3.12", ] dependencies = [ - "dbt-common>=1.6,<2.0", + "dbt-common>=1.10,<2.0", "pytz>=2015.7", # installed via dbt-common but used directly "agate>=1.0,<2.0", diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py new file mode 100644 index 00000000..346634df --- /dev/null +++ b/tests/unit/conftest.py @@ -0,0 +1 @@ +from tests.unit.fixtures import adapter, behavior_flags, config, flags diff --git a/tests/unit/fixtures/__init__.py b/tests/unit/fixtures/__init__.py new file mode 100644 index 00000000..78135a2c --- /dev/null +++ b/tests/unit/fixtures/__init__.py @@ -0,0 +1 @@ +from tests.unit.fixtures.adapter import adapter, behavior_flags, config, flags diff --git a/tests/unit/fixtures/adapter.py b/tests/unit/fixtures/adapter.py new file mode 100644 index 00000000..b59b0423 --- /dev/null +++ b/tests/unit/fixtures/adapter.py @@ -0,0 +1,146 @@ +from multiprocessing import get_context +from types import SimpleNamespace +from typing import Any, Dict, List + +import agate +from dbt_common.behavior_flags import BehaviorFlag +import pytest + +from dbt.adapters.base.column import Column +from dbt.adapters.base.impl import BaseAdapter +from dbt.adapters.base.relation import BaseRelation +from dbt.adapters.contracts.connection import AdapterRequiredConfig, QueryComment + +from tests.unit.fixtures.connection_manager import ConnectionManagerStub +from tests.unit.fixtures.credentials import CredentialsStub + + +@pytest.fixture +def adapter(config, behavior_flags) -> BaseAdapter: + + class BaseAdapterStub(BaseAdapter): + """ + A stub for an adapter that uses the cache as the database + """ + + ConnectionManager = ConnectionManagerStub + + @property + def _behavior_flags(self) -> List[BehaviorFlag]: + return behavior_flags + + ### + # Abstract methods for database-specific values, attributes, and types + ### + @classmethod + def date_function(cls) -> str: + return "date_function" + + @classmethod + def is_cancelable(cls) -> bool: + return False + + def list_schemas(self, database: str) -> List[str]: + return list(self.cache.schemas) + + ### + # Abstract methods about relations + ### + def drop_relation(self, relation: BaseRelation) -> None: + self.cache_dropped(relation) + + def truncate_relation(self, relation: BaseRelation) -> None: + self.cache_dropped(relation) + + def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None: + self.cache_renamed(from_relation, to_relation) + + def get_columns_in_relation(self, relation: BaseRelation) -> List[Column]: + # there's no database, so these need to be added as kwargs in the existing_relations fixture + return relation.columns + + def expand_column_types(self, goal: BaseRelation, current: BaseRelation) -> None: + # there's no database, so these need to be added as kwargs in the existing_relations fixture + object.__setattr__(current, "columns", goal.columns) + + def list_relations_without_caching( + self, schema_relation: BaseRelation + ) -> List[BaseRelation]: + # there's no database, so use the cache as the database + return self.cache.get_relations(schema_relation.database, schema_relation.schema) + + ### + # ODBC FUNCTIONS -- these should not need to change for every adapter, + # although some adapters may override them + ### + def create_schema(self, relation: BaseRelation): + # there's no database, this happens implicitly by adding a relation to the cache + pass + + def drop_schema(self, relation: BaseRelation): + for each_relation in self.cache.get_relations(relation.database, relation.schema): + self.cache_dropped(each_relation) + + @classmethod + def quote(cls, identifier: str) -> str: + quote_char = "" + return f"{quote_char}{identifier}{quote_char}" + + ### + # Conversions: These must be implemented by concrete implementations, for + # converting agate types into their sql equivalents. + ### + @classmethod + def convert_text_type(cls, agate_table: agate.Table, col_idx: int) -> str: + return "str" + + @classmethod + def convert_number_type(cls, agate_table: agate.Table, col_idx: int) -> str: + return "float" + + @classmethod + def convert_boolean_type(cls, agate_table: agate.Table, col_idx: int) -> str: + return "bool" + + @classmethod + def convert_datetime_type(cls, agate_table: agate.Table, col_idx: int) -> str: + return "datetime" + + @classmethod + def convert_date_type(cls, *args, **kwargs): + return "date" + + @classmethod + def convert_time_type(cls, *args, **kwargs): + return "time" + + return BaseAdapterStub(config, get_context("spawn")) + + +@pytest.fixture +def config(flags) -> AdapterRequiredConfig: + raw_config = { + "credentials": CredentialsStub("test_database", "test_schema"), + "profile_name": "test_profile", + "target_name": "test_target", + "threads": 4, + "project_name": "test_project", + "query_comment": QueryComment(), + "cli_vars": {}, + "target_path": "path/to/nowhere", + "log_cache_events": False, + "flags": flags, + } + return SimpleNamespace(**raw_config) + + +@pytest.fixture +def flags() -> Dict[str, Any]: + # this is the flags collection in dbt_project.yaml + return {} + + +@pytest.fixture +def behavior_flags() -> List[BehaviorFlag]: + # this is the collection of behavior flags for a specific adapter + return [] diff --git a/tests/unit/fixtures/connection_manager.py b/tests/unit/fixtures/connection_manager.py new file mode 100644 index 00000000..8b353fbe --- /dev/null +++ b/tests/unit/fixtures/connection_manager.py @@ -0,0 +1,58 @@ +from contextlib import contextmanager +from typing import ContextManager, List, Optional, Tuple + +import agate + +from dbt.adapters.base.connections import BaseConnectionManager +from dbt.adapters.contracts.connection import AdapterResponse, Connection, ConnectionState + + +class ConnectionManagerStub(BaseConnectionManager): + """ + A stub for a connection manager that does not connect to a database + """ + + raised_exceptions: List[Exception] + + @contextmanager + def exception_handler(self, sql: str) -> ContextManager: # type: ignore + # catch all exceptions and put them on this class for inspection in tests + try: + yield + except Exception as exc: + self.raised_exceptions.append(exc) + finally: + pass + + def cancel_open(self) -> Optional[List[str]]: + names = [] + for connection in self.thread_connections.values(): + if connection.state == ConnectionState.OPEN: + connection.state = ConnectionState.CLOSED + if name := connection.name: + names.append(name) + return names + + @classmethod + def open(cls, connection: Connection) -> Connection: + # there's no database, so just change the state + connection.state = ConnectionState.OPEN + return connection + + def begin(self) -> None: + # there's no database, so there are no transactions + pass + + def commit(self) -> None: + # there's no database, so there are no transactions + pass + + def execute( + self, + sql: str, + auto_begin: bool = False, + fetch: bool = False, + limit: Optional[int] = None, + ) -> Tuple[AdapterResponse, agate.Table]: + # there's no database, so just return the sql + return AdapterResponse(_message="", code=sql), agate.Table([]) diff --git a/tests/unit/fixtures/credentials.py b/tests/unit/fixtures/credentials.py new file mode 100644 index 00000000..88817f6b --- /dev/null +++ b/tests/unit/fixtures/credentials.py @@ -0,0 +1,13 @@ +from dbt.adapters.contracts.connection import Credentials + + +class CredentialsStub(Credentials): + """ + A stub for a database credentials that does not connect to a database + """ + + def type(self) -> str: + return "test" + + def _connection_keys(self): + return {"database": self.database, "schema": self.schema} diff --git a/tests/unit/test_behavior_flags.py b/tests/unit/test_behavior_flags.py new file mode 100644 index 00000000..7f3abb89 --- /dev/null +++ b/tests/unit/test_behavior_flags.py @@ -0,0 +1,66 @@ +from typing import Any, Dict, List + +from dbt_common.behavior_flags import BehaviorFlag +from dbt_common.exceptions import DbtBaseException +import pytest + + +@pytest.fixture +def flags() -> Dict[str, Any]: + return { + "unregistered_flag": True, + "default_false_user_false_flag": False, + "default_false_user_true_flag": True, + "default_true_user_false_flag": False, + "default_true_user_true_flag": True, + } + + +@pytest.fixture +def behavior_flags() -> List[BehaviorFlag]: + return [ + { + "name": "default_false_user_false_flag", + "default": False, + "docs_url": "https://docs.com", + }, + { + "name": "default_false_user_true_flag", + "default": False, + "description": "This is a false flag.", + }, + { + "name": "default_false_user_skip_flag", + "default": False, + "description": "This is a true flag.", + }, + { + "name": "default_true_user_false_flag", + "default": True, + "description": "This is fake news.", + }, + { + "name": "default_true_user_true_flag", + "default": True, + "docs_url": "https://moar.docs.com", + }, + { + "name": "default_true_user_skip_flag", + "default": True, + "description": "This is a true flag.", + }, + ] + + +def test_register_behavior_flags(adapter): + # make sure that users cannot add arbitrary flags to this collection + with pytest.raises(DbtBaseException): + assert adapter.behavior.unregistered_flag + + # check the values of the valid behavior flags + assert not adapter.behavior.default_false_user_false_flag + assert adapter.behavior.default_false_user_true_flag + assert not adapter.behavior.default_false_user_skip_flag + assert not adapter.behavior.default_true_user_false_flag + assert adapter.behavior.default_true_user_true_flag + assert adapter.behavior.default_true_user_skip_flag diff --git a/tests/unit/test_relation.py b/tests/unit/test_relation.py index 97d56419..6d835e0d 100644 --- a/tests/unit/test_relation.py +++ b/tests/unit/test_relation.py @@ -1,8 +1,9 @@ from dataclasses import dataclass, replace - +from datetime import datetime import pytest from dbt.adapters.base import BaseRelation +from dbt.adapters.base.relation import EventTimeFilter from dbt.adapters.contracts.relation import RelationType @@ -81,6 +82,80 @@ def test_render_limited(limit, require_alias, expected_result): assert str(my_relation) == expected_result +@pytest.mark.parametrize( + "event_time_filter,require_alias,expected_result", + [ + (None, False, '"test_database"."test_schema"."test_identifier"'), + ( + EventTimeFilter(field_name="column"), + False, + '"test_database"."test_schema"."test_identifier"', + ), + (None, True, '"test_database"."test_schema"."test_identifier"'), + ( + EventTimeFilter(field_name="column"), + True, + '"test_database"."test_schema"."test_identifier"', + ), + ( + EventTimeFilter(field_name="column", start=datetime(year=2020, month=1, day=1)), + False, + """(select * from "test_database"."test_schema"."test_identifier" where column >= '2020-01-01 00:00:00')""", + ), + ( + EventTimeFilter(field_name="column", start=datetime(year=2020, month=1, day=1)), + True, + """(select * from "test_database"."test_schema"."test_identifier" where column >= '2020-01-01 00:00:00') _dbt_et_filter_subq_test_identifier""", + ), + ( + EventTimeFilter(field_name="column", end=datetime(year=2020, month=1, day=1)), + False, + """(select * from "test_database"."test_schema"."test_identifier" where column < '2020-01-01 00:00:00')""", + ), + ( + EventTimeFilter( + field_name="column", + start=datetime(year=2020, month=1, day=1), + end=datetime(year=2020, month=1, day=2), + ), + False, + """(select * from "test_database"."test_schema"."test_identifier" where column >= '2020-01-01 00:00:00' and column < '2020-01-02 00:00:00')""", + ), + ], +) +def test_render_event_time_filtered(event_time_filter, require_alias, expected_result): + my_relation = BaseRelation.create( + database="test_database", + schema="test_schema", + identifier="test_identifier", + event_time_filter=event_time_filter, + require_alias=require_alias, + ) + actual_result = my_relation.render_event_time_filtered() + assert actual_result == expected_result + assert str(my_relation) == expected_result + + +def test_render_event_time_filtered_and_limited(): + my_relation = BaseRelation.create( + database="test_database", + schema="test_schema", + identifier="test_identifier", + event_time_filter=EventTimeFilter( + field_name="column", + start=datetime(year=2020, month=1, day=1), + end=datetime(year=2020, month=1, day=2), + ), + limit=0, + require_alias=False, + ) + expected_result = """(select * from (select * from "test_database"."test_schema"."test_identifier" where false limit 0) where column >= '2020-01-01 00:00:00' and column < '2020-01-02 00:00:00')""" + + actual_result = my_relation.render_event_time_filtered(my_relation.render_limited()) + assert actual_result == expected_result + assert str(my_relation) == expected_result + + def test_create_ephemeral_from_uses_identifier(): @dataclass class Node: