diff --git a/.changes/1.10.0.md b/.changes/1.10.0.md new file mode 100644 index 00000000..10fc93bc --- /dev/null +++ b/.changes/1.10.0.md @@ -0,0 +1 @@ +## dbt-adapters 1.10.0 - September 12, 2024 diff --git a/.changes/1.10.1.md b/.changes/1.10.1.md new file mode 100644 index 00000000..01d6fda2 --- /dev/null +++ b/.changes/1.10.1.md @@ -0,0 +1 @@ +## dbt-adapters 1.10.1 - September 16, 2024 diff --git a/.changes/1.10.2.md b/.changes/1.10.2.md new file mode 100644 index 00000000..09244521 --- /dev/null +++ b/.changes/1.10.2.md @@ -0,0 +1,5 @@ +## dbt-adapters 1.10.2 - October 01, 2024 + +### Under the Hood + +- dbt-tests-adapters: Add required begin to microbatch model config to BaseMicrobatch test ([#315](https://github.com/dbt-labs/dbt-adapters/issues/315)) diff --git a/.changes/1.10.3.md b/.changes/1.10.3.md new file mode 100644 index 00000000..29844ce2 --- /dev/null +++ b/.changes/1.10.3.md @@ -0,0 +1 @@ +## dbt-adapters 1.10.3 - October 29, 2024 diff --git a/.changes/1.4.1.md b/.changes/1.4.1.md new file mode 100644 index 00000000..82a731a2 --- /dev/null +++ b/.changes/1.4.1.md @@ -0,0 +1,13 @@ +## dbt-adapters 1.4.1 - August 09, 2024 + +### Fixes + +- Use model alias for the CTE identifier generated during ephemeral materialization ([#5273](https://github.com/dbt-labs/dbt-adapters/issues/5273)) + +### Under the Hood + +- Updating changie.yaml to add contributors and PR links ([#219](https://github.com/dbt-labs/dbt-adapters/issues/219)) + +### Contributors +- [@jeancochrane](https://github.com/jeancochrane) ([#5273](https://github.com/dbt-labs/dbt-adapters/issues/5273)) +- [@leahwicz](https://github.com/leahwicz) ([#219](https://github.com/dbt-labs/dbt-adapters/issues/219)) diff --git a/.changes/1.5.0.md b/.changes/1.5.0.md new file mode 100644 index 00000000..e4c54bdc --- /dev/null +++ b/.changes/1.5.0.md @@ -0,0 +1,11 @@ +## dbt-adapters 1.5.0 - September 10, 2024 + +### Features + +- Compare 'snapshot_get_time' and snapshot 'updated_at' data types ([#242](https://github.com/dbt-labs/dbt-adapters/issues/242)) +- Add Behavior Flag framework ([#281](https://github.com/dbt-labs/dbt-adapters/issues/281)) +- Add EventTimeFilter to BaseRelation, which renders a filtered relation when start or end is set ([#294](https://github.com/dbt-labs/dbt-adapters/issues/294)) + +### Dependencies + +- Update dbt-common pin to >=1.8 ([#299](https://github.com/dbt-labs/dbt-adapters/pull/299)) diff --git a/.changes/1.6.0.md b/.changes/1.6.0.md new file mode 100644 index 00000000..c109454a --- /dev/null +++ b/.changes/1.6.0.md @@ -0,0 +1,5 @@ +## dbt-adapters 1.6.0 - September 12, 2024 + +### Features + +- Default microbatch strategy implementation and base tests ([#302](https://github.com/dbt-labs/dbt-adapters/issues/302)) diff --git a/.changes/1.6.1.md b/.changes/1.6.1.md new file mode 100644 index 00000000..45b41bcd --- /dev/null +++ b/.changes/1.6.1.md @@ -0,0 +1 @@ +## dbt-adapters 1.6.1 - September 16, 2024 diff --git a/.changes/1.7.0.md b/.changes/1.7.0.md new file mode 100644 index 00000000..efbdc601 --- /dev/null +++ b/.changes/1.7.0.md @@ -0,0 +1,5 @@ +## dbt-adapters 1.7.0 - September 19, 2024 + +### Features + +- Allow configuring of snapshot column names ([#289](https://github.com/dbt-labs/dbt-adapters/issues/289)) diff --git a/.changes/1.7.1.md b/.changes/1.7.1.md new file mode 100644 index 00000000..4acda7f8 --- /dev/null +++ b/.changes/1.7.1.md @@ -0,0 +1,5 @@ +## dbt-adapters 1.7.1 - October 15, 2024 + +### Features + +- Enable setting current value of dbt_valid_to ([#320](https://github.com/dbt-labs/dbt-adapters/issues/320)) diff --git a/.changes/1.7.2.md b/.changes/1.7.2.md new file mode 100644 index 00000000..b17bf217 --- /dev/null +++ b/.changes/1.7.2.md @@ -0,0 +1,16 @@ +## dbt-adapters 1.7.2 - October 21, 2024 + +### Breaking Changes + +- Drop support for Python 3.8 ([#332](https://github.com/dbt-labs/dbt-adapters/issues/332)) + +### Features + +- Allows unique_key for snapshots to take a list ([#181](https://github.com/dbt-labs/dbt-adapters/issues/181)) + +### Fixes + +- Always validate an incremental model's `incremental_strategy` ([#330](https://github.com/dbt-labs/dbt-adapters/issues/330)) + +### Contributors +- [@agpapa](https://github.com/agpapa) ([#181](https://github.com/dbt-labs/dbt-adapters/issues/181)) diff --git a/.changes/1.8.0.md b/.changes/1.8.0.md new file mode 100644 index 00000000..f73a0300 --- /dev/null +++ b/.changes/1.8.0.md @@ -0,0 +1,9 @@ +## dbt-adapters 1.8.0 - October 29, 2024 + +### Fixes + +- Always make behavior flags available for evaluation ([#338](https://github.com/dbt-labs/dbt-adapters/issues/338)) + +### Under the Hood + +- Add adapter telemetry. ([#301](https://github.com/dbt-labs/dbt-adapters/issues/301)) diff --git a/.changes/unreleased/Fixes-20240610-195300.yaml b/.changes/unreleased/Fixes-20240610-195300.yaml deleted file mode 100644 index 1f8cd5a5..00000000 --- a/.changes/unreleased/Fixes-20240610-195300.yaml +++ /dev/null @@ -1,6 +0,0 @@ -kind: Fixes -body: Use model alias for the CTE identifier generated during ephemeral materialization -time: 2024-06-10T19:53:00.086488231Z -custom: - Author: jeancochrane - Issue: "5273" diff --git a/.changes/unreleased/Under the Hood-20240801-220551.yaml b/.changes/unreleased/Under the Hood-20240801-220551.yaml deleted file mode 100644 index 25b54a65..00000000 --- a/.changes/unreleased/Under the Hood-20240801-220551.yaml +++ /dev/null @@ -1,6 +0,0 @@ -kind: Under the Hood -body: Updating changie.yaml to add contributors and PR links -time: 2024-08-01T22:05:51.327652-04:00 -custom: - Author: leahwicz - Issue: "219" diff --git a/.github/actions/publish-results/action.yml b/.github/actions/publish-results/action.yml index 0d5cb7e6..7c73a94f 100644 --- a/.github/actions/publish-results/action.yml +++ b/.github/actions/publish-results/action.yml @@ -5,7 +5,7 @@ inputs: description: File type for file name stub (e.g. "unit-tests") required: true python-version: - description: Python version for the file name stub (e.g. "3.8") + description: Python version for the file name stub (e.g. "3.9") required: true source-file: description: File to be uploaded diff --git a/.github/workflows/precommit-autoupdate.yml b/.github/workflows/precommit-autoupdate.yml new file mode 100644 index 00000000..74976c48 --- /dev/null +++ b/.github/workflows/precommit-autoupdate.yml @@ -0,0 +1,22 @@ +name: "Run pre-commit autoupdate" + +on: + schedule: + - cron: "30 1 * * SAT" + workflow_dispatch: + +permissions: + contents: write + +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.sha }} + cancel-in-progress: true + +jobs: + precommit-autoupdate: + name: "Run pre-commit autoupdate" + uses: dbt-labs/actions/.github/workflows/pre-commit-autoupdate.yml + secrets: + TOKEN: ${{ secrets.FISHTOWN_BOT_PAT }} + SLACK_WEBHOOK_PR_URL: ${{ secrets.SLACK_DEV_ADAPTER_PULL_REQUESTS }} + SLACK_WEBHOOK_ALERTS_URL: ${{ secrets.SLACK_DEV_ADAPTER_ALERTS }} diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index b61c83d7..b4ac615d 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -23,7 +23,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12"] steps: - name: Check out repository diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index caf34209..b7835274 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -18,19 +18,19 @@ repos: - id: dbt-core-in-adapters-check - repo: https://github.com/psf/black - rev: 24.4.0 + rev: 24.8.0 hooks: - id: black args: - --line-length=99 - - --target-version=py38 - --target-version=py39 - --target-version=py310 - --target-version=py311 + - --target-version=py312 - --force-exclude=dbt/adapters/events/adapter_types_pb2.py - repo: https://github.com/pycqa/flake8 - rev: 7.0.0 + rev: 7.1.1 hooks: - id: flake8 exclude: dbt/adapters/events/adapter_types_pb2.py|tests/functional/ @@ -38,10 +38,10 @@ repos: - --max-line-length=99 - --select=E,F,W - --ignore=E203,E501,E704,E741,W503,W504 - - --per-file-ignores=*/__init__.py:F401 + - --per-file-ignores=*/__init__.py:F401,*/conftest.py:F401 - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.9.0 + rev: v1.11.2 hooks: - id: mypy exclude: dbt/adapters/events/adapter_types_pb2.py|dbt-tests-adapter/dbt/__init__.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 4146e95e..9971e5ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,93 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html), and is generated by [Changie](https://github.com/miniscruff/changie). +## dbt-adapters 1.10.3 - October 29, 2024 + + + +## dbt-adapters 1.10.2 - October 01, 2024 + +### Under the Hood + +- dbt-tests-adapters: Add required begin to microbatch model config to BaseMicrobatch test ([#315](https://github.com/dbt-labs/dbt-adapters/issues/315)) + +## dbt-adapters 1.10.1 - September 16, 2024 + +## dbt-adapters 1.10.0 - September 12, 2024 + +## dbt-adapters 1.8.0 - October 29, 2024 + +### Fixes + +- Always make behavior flags available for evaluation ([#338](https://github.com/dbt-labs/dbt-adapters/issues/338)) + +### Under the Hood + +- Add adapter telemetry. ([#301](https://github.com/dbt-labs/dbt-adapters/issues/301)) + +## dbt-adapters 1.7.2 - October 21, 2024 + +### Breaking Changes + +- Drop support for Python 3.8 ([#332](https://github.com/dbt-labs/dbt-adapters/issues/332)) + +### Features + +- Allows unique_key for snapshots to take a list ([#181](https://github.com/dbt-labs/dbt-adapters/issues/181)) + +### Fixes + +- Always validate an incremental model's `incremental_strategy` ([#330](https://github.com/dbt-labs/dbt-adapters/issues/330)) + +### Contributors +- [@agpapa](https://github.com/agpapa) ([#181](https://github.com/dbt-labs/dbt-adapters/issues/181)) + +## dbt-adapters 1.7.1 - October 15, 2024 + +### Features + +- Enable setting current value of dbt_valid_to ([#320](https://github.com/dbt-labs/dbt-adapters/issues/320)) + +## dbt-adapters 1.7.0 - September 19, 2024 + +### Features + +- Allow configuring of snapshot column names ([#289](https://github.com/dbt-labs/dbt-adapters/issues/289)) + +## dbt-adapters 1.6.1 - September 16, 2024 + +## dbt-adapters 1.6.0 - September 12, 2024 + +### Features + +- Default microbatch strategy implementation and base tests ([#302](https://github.com/dbt-labs/dbt-adapters/issues/302)) + +## dbt-adapters 1.5.0 - September 10, 2024 + +### Features + +- Compare 'snapshot_get_time' and snapshot 'updated_at' data types ([#242](https://github.com/dbt-labs/dbt-adapters/issues/242)) +- Add Behavior Flag framework ([#281](https://github.com/dbt-labs/dbt-adapters/issues/281)) +- Add EventTimeFilter to BaseRelation, which renders a filtered relation when start or end is set ([#294](https://github.com/dbt-labs/dbt-adapters/issues/294)) + +### Dependencies + +- Update dbt-common pin to >=1.8 ([#299](https://github.com/dbt-labs/dbt-adapters/pull/299)) + +## dbt-adapters 1.4.1 - August 09, 2024 + +### Fixes + +- Use model alias for the CTE identifier generated during ephemeral materialization ([#5273](https://github.com/dbt-labs/dbt-adapters/issues/5273)) + +### Under the Hood + +- Updating changie.yaml to add contributors and PR links ([#219](https://github.com/dbt-labs/dbt-adapters/issues/219)) + +### Contributors +- [@jeancochrane](https://github.com/jeancochrane) ([#5273](https://github.com/dbt-labs/dbt-adapters/issues/5273)) +- [@leahwicz](https://github.com/leahwicz) ([#219](https://github.com/dbt-labs/dbt-adapters/issues/219)) + ## dbt-adapters 1.4.0 - July 30, 2024 ### Features diff --git a/dbt-tests-adapter/dbt/tests/__about__.py b/dbt-tests-adapter/dbt/tests/__about__.py index 1b022739..977620c3 100644 --- a/dbt-tests-adapter/dbt/tests/__about__.py +++ b/dbt-tests-adapter/dbt/tests/__about__.py @@ -1 +1 @@ -version = "1.9.2" +version = "1.10.3" diff --git a/dbt-tests-adapter/dbt/tests/adapter/basic/files.py b/dbt-tests-adapter/dbt/tests/adapter/basic/files.py index 751b01a0..d0253a53 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/basic/files.py +++ b/dbt-tests-adapter/dbt/tests/adapter/basic/files.py @@ -186,6 +186,10 @@ {{ config(materialized="incremental") }} """ +config_materialized_incremental_invalid_strategy = """ + {{ config(materialized="incremental", incremental_strategy="bad_strategy") }} +""" + config_materialized_var = """ {{ config(materialized=var("materialized_var", "table"))}} """ @@ -217,3 +221,6 @@ ephemeral_view_sql = config_materialized_view + model_ephemeral ephemeral_table_sql = config_materialized_table + model_ephemeral incremental_sql = config_materialized_incremental + model_incremental +incremental_invalid_strategy_sql = ( + config_materialized_incremental_invalid_strategy + model_incremental +) diff --git a/dbt-tests-adapter/dbt/tests/adapter/basic/test_incremental.py b/dbt-tests-adapter/dbt/tests/adapter/basic/test_incremental.py index fe04a5a1..57cc4db9 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/basic/test_incremental.py +++ b/dbt-tests-adapter/dbt/tests/adapter/basic/test_incremental.py @@ -86,6 +86,45 @@ def test_incremental_not_schema_change(self, project): assert run_result == RunStatus.Success +class BaseIncrementalBadStrategy: + @pytest.fixture(scope="class") + def project_config_update(self): + return {"name": "incremental"} + + @pytest.fixture(scope="class") + def models(self): + return { + "incremental.sql": files.incremental_invalid_strategy_sql, + "schema.yml": files.schema_base_yml, + } + + @pytest.fixture(scope="class") + def seeds(self): + return {"base.csv": files.seeds_base_csv, "added.csv": files.seeds_added_csv} + + @pytest.fixture(autouse=True) + def clean_up(self, project): + yield + with project.adapter.connection_named("__test"): + relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.adapter.drop_schema(relation) + + def test_incremental_invalid_strategy(self, project): + # seed command + results = run_dbt(["seed"]) + assert len(results) == 2 + + # try to run the incremental model, it should fail on the first attempt + results = run_dbt(["run"], expect_pass=False) + assert len(results.results) == 1 + assert ( + 'dbt could not find an incremental strategy macro with the name "get_incremental_bad_strategy_sql"' + in results.results[0].message + ) + + class Testincremental(BaseIncremental): pass diff --git a/dbt-tests-adapter/dbt/tests/adapter/hooks/test_run_hooks.py b/dbt-tests-adapter/dbt/tests/adapter/hooks/test_run_hooks.py index 89565c70..0136e3f6 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/hooks/test_run_hooks.py +++ b/dbt-tests-adapter/dbt/tests/adapter/hooks/test_run_hooks.py @@ -3,6 +3,7 @@ import pytest +from dbt_common.exceptions import DbtDatabaseError from dbt.tests.adapter.hooks import fixtures from dbt.tests.util import check_table_does_not_exist, run_dbt @@ -11,8 +12,8 @@ class BasePrePostRunHooks: @pytest.fixture(scope="function") def setUp(self, project): project.run_sql_file(project.test_data_dir / Path("seed_run.sql")) - project.run_sql(f"drop table if exists { project.test_schema }.schemas") - project.run_sql(f"drop table if exists { project.test_schema }.db_schemas") + project.run_sql(f"drop table if exists {project.test_schema}.schemas") + project.run_sql(f"drop table if exists {project.test_schema}.db_schemas") os.environ["TERM_TEST"] = "TESTING" @pytest.fixture(scope="class") @@ -158,7 +159,8 @@ def project_config_update(self): } def test_missing_column_pre_hook(self, project): - run_dbt(["run"], expect_pass=False) + with pytest.raises(DbtDatabaseError): + run_dbt(["run"], expect_pass=False) class TestAfterRunHooks(BaseAfterRunHooks): diff --git a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py new file mode 100644 index 00000000..5bbabbe1 --- /dev/null +++ b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_microbatch.py @@ -0,0 +1,96 @@ +import os +from pprint import pformat +from unittest import mock + +import pytest + +from dbt.tests.util import relation_from_name, run_dbt + +try: + # patch_microbatch_end_time introduced in dbt 1.9.0 + from dbt.tests.util import patch_microbatch_end_time +except ImportError: + from freezegun import freeze_time as patch_microbatch_end_time + +_input_model_sql = """ +{{ config(materialized='table', event_time='event_time') }} +select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time +union all +select 2 as id, TIMESTAMP '2020-01-02 00:00:00-0' as event_time +union all +select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time +""" + +_microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} +select * from {{ ref('input_model') }} +""" + + +class BaseMicrobatch: + @pytest.fixture(scope="class") + def microbatch_model_sql(self) -> str: + """ + This is the SQL that defines the microbatch model, including any {{ config(..) }} + """ + return _microbatch_model_sql + + @pytest.fixture(scope="class") + def input_model_sql(self) -> str: + """ + This is the SQL that defines the input model to the microbatch model, including any {{ config(..) }}. + event_time is a required configuration of this input + """ + return _input_model_sql + + @pytest.fixture(scope="class") + def insert_two_rows_sql(self, project) -> str: + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00-0'), (5, TIMESTAMP '2020-01-05 00:00:00-0')" + + @pytest.fixture(scope="class") + def models(self, microbatch_model_sql, input_model_sql): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_sql, + } + + def assert_row_count(self, project, relation_name: str, expected_row_count: int): + relation = relation_from_name(project.adapter, relation_name) + result = project.run_sql(f"select * from {relation}", fetch="all") + + assert len(result) == expected_row_count, f"{relation_name}:{pformat(result)}" + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_run_with_event_time(self, project, insert_two_rows_sql): + # initial run -- backfills all data + with patch_microbatch_end_time("2020-01-03 13:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # our partition grain is "day" so running the same day without new data should produce the same results + with patch_microbatch_end_time("2020-01-03 14:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # add next two days of data + project.run_sql(insert_two_rows_sql) + + self.assert_row_count(project, "input_model", 5) + + # re-run without changing current time => no insert + with patch_microbatch_end_time("2020-01-03 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 3) + + # re-run by advancing time by one day changing current time => insert 1 row + with patch_microbatch_end_time("2020-01-04 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 4) + + # re-run by advancing time by one more day changing current time => insert 1 more row + with patch_microbatch_end_time("2020-01-05 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 5) diff --git a/dbt-tests-adapter/pyproject.toml b/dbt-tests-adapter/pyproject.toml index 73bce49e..d2f732b7 100644 --- a/dbt-tests-adapter/pyproject.toml +++ b/dbt-tests-adapter/pyproject.toml @@ -4,7 +4,7 @@ name = "dbt-tests-adapter" description = "The set of reusable tests and test fixtures used to test common functionality" readme = "README.md" keywords = ["dbt", "adapter", "adapters", "database", "elt", "dbt-core", "dbt Core", "dbt Cloud", "dbt Labs"] -requires-python = ">=3.8.0" +requires-python = ">=3.9.0" authors = [ { name = "dbt Labs", email = "info@dbtlabs.com" }, ] @@ -17,7 +17,6 @@ classifiers = [ "Operating System :: MacOS :: MacOS X", "Operating System :: Microsoft :: Windows", "Operating System :: POSIX :: Linux", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", @@ -31,6 +30,7 @@ dependencies = [ # `dbt-core` takes the packages below as dependencies, so they are unpinned to avoid conflicts "dbt-adapters", "pyyaml", + "freezegun", ] [project.urls] Homepage = "https://github.com/dbt-labs/dbt-adapters" diff --git a/dbt/adapters/__about__.py b/dbt/adapters/__about__.py index d619c757..6aaa73b8 100644 --- a/dbt/adapters/__about__.py +++ b/dbt/adapters/__about__.py @@ -1 +1 @@ -version = "1.4.0" +version = "1.8.0" diff --git a/dbt/adapters/base/__init__.py b/dbt/adapters/base/__init__.py index ade1af3d..c30dd01f 100644 --- a/dbt/adapters/base/__init__.py +++ b/dbt/adapters/base/__init__.py @@ -12,4 +12,5 @@ BaseRelation, RelationType, SchemaSearchMap, + AdapterTrackingRelationInfo, ) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index e0627c47..41481535 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -4,6 +4,7 @@ from contextlib import contextmanager from datetime import datetime from enum import Enum +from importlib import import_module from multiprocessing.context import SpawnContext from typing import ( Any, @@ -22,8 +23,9 @@ Union, TYPE_CHECKING, ) - +import os import pytz +from dbt_common.behavior_flags import Behavior, BehaviorFlag from dbt_common.clients.jinja import CallableMacroGenerator from dbt_common.contracts.constraints import ( ColumnLevelConstraint, @@ -54,18 +56,20 @@ BaseConnectionManager, Connection, ) -from dbt.adapters.base.meta import AdapterMeta, available +from dbt.adapters.base.meta import AdapterMeta, available, available_property from dbt.adapters.base.relation import ( BaseRelation, ComponentName, InformationSchema, SchemaSearchMap, + AdapterTrackingRelationInfo, ) from dbt.adapters.cache import RelationsCache, _make_ref_key_dict from dbt.adapters.capability import Capability, CapabilityDict from dbt.adapters.contracts.connection import Credentials from dbt.adapters.contracts.macros import MacroResolverProtocol from dbt.adapters.contracts.relation import RelationConfig + from dbt.adapters.events.types import ( CacheMiss, CatalogGenerationError, @@ -82,7 +86,6 @@ QuoteConfigTypeError, RelationReturnedMultipleResultsError, RenameToNoneAttemptedError, - SnapshotTargetIncompleteError, SnapshotTargetNotSnapshotTableError, UnexpectedNonTimestampError, ) @@ -261,7 +264,7 @@ class BaseAdapter(metaclass=AdapterMeta): MAX_SCHEMA_METADATA_RELATIONS = 100 - # This static member variable can be overriden in concrete adapter + # This static member variable can be overridden in concrete adapter # implementations to indicate adapter support for optional capabilities. _capabilities = CapabilityDict({}) @@ -271,6 +274,8 @@ def __init__(self, config, mp_context: SpawnContext) -> None: self.connections = self.ConnectionManager(config, mp_context) self._macro_resolver: Optional[MacroResolverProtocol] = None self._macro_context_generator: Optional[MacroContextGeneratorCallable] = None + # this will be updated to include global behavior flags once they exist + self.behavior = [] # type: ignore ### # Methods to set / access a macro resolver @@ -291,6 +296,28 @@ def set_macro_context_generator( ) -> None: self._macro_context_generator = macro_context_generator + @available_property + def behavior(self) -> Behavior: + return self._behavior + + @behavior.setter # type: ignore + def behavior(self, flags: List[BehaviorFlag]) -> None: + flags.extend(self._behavior_flags) + + # we don't always get project flags, for example, the project file is not loaded during `dbt debug` + # in that case, load the default values for behavior flags to avoid compilation errors + # this mimics not loading a project file, or not specifying flags in a project file + user_overrides = getattr(self.config, "flags", {}) + + self._behavior = Behavior(flags, user_overrides) + + @property + def _behavior_flags(self) -> List[BehaviorFlag]: + """ + This method should be overwritten by adapter maintainers to provide platform-specific flags + """ + return [] + ### # Methods that pass through to the connection manager ### @@ -740,7 +767,9 @@ def get_missing_columns( return [col for (col_name, col) in from_columns.items() if col_name in missing_columns] @available.parse_none - def valid_snapshot_target(self, relation: BaseRelation) -> None: + def valid_snapshot_target( + self, relation: BaseRelation, column_names: Optional[Dict[str, str]] = None + ) -> None: """Ensure that the target relation is valid, by making sure it has the expected columns. @@ -758,21 +787,16 @@ def valid_snapshot_target(self, relation: BaseRelation) -> None: columns = self.get_columns_in_relation(relation) names = set(c.name.lower() for c in columns) - expanded_keys = ("scd_id", "valid_from", "valid_to") - extra = [] missing = [] - for legacy in expanded_keys: - desired = "dbt_" + legacy + # Note: we're not checking dbt_updated_at here because it's not + # always present. + for column in ("dbt_scd_id", "dbt_valid_from", "dbt_valid_to"): + desired = column_names[column] if column_names else column if desired not in names: missing.append(desired) - if legacy in names: - extra.append(legacy) if missing: - if extra: - raise SnapshotTargetIncompleteError(extra, missing) - else: - raise SnapshotTargetNotSnapshotTableError(missing) + raise SnapshotTargetNotSnapshotTableError(missing) @available.parse_none def expand_target_column_types( @@ -1549,7 +1573,11 @@ def valid_incremental_strategies(self): return ["append"] def builtin_incremental_strategies(self): - return ["append", "delete+insert", "merge", "insert_overwrite"] + builtin_strategies = ["append", "delete+insert", "merge", "insert_overwrite"] + if os.environ.get("DBT_EXPERIMENTAL_MICROBATCH"): + builtin_strategies.append("microbatch") + + return builtin_strategies @available.parse_none def get_incremental_strategy_macro(self, model_context, strategy: str): @@ -1719,6 +1747,30 @@ def capabilities(cls) -> CapabilityDict: def supports(cls, capability: Capability) -> bool: return bool(cls.capabilities()[capability]) + @classmethod + def get_adapter_run_info(cls, config: RelationConfig) -> AdapterTrackingRelationInfo: + adapter_class_name, *_ = cls.__name__.split("Adapter") + adapter_name = adapter_class_name.lower() + + if adapter_name == "base": + adapter_version = "" + else: + adapter_version = import_module(f"dbt.adapters.{adapter_name}.__version__").version + + return AdapterTrackingRelationInfo( + adapter_name=adapter_name, + base_adapter_version=import_module("dbt.adapters.__about__").version, + adapter_version=adapter_version, + model_adapter_details=cls._get_adapter_specific_run_info(config), + ) + + @classmethod + def _get_adapter_specific_run_info(cls, config) -> Dict[str, Any]: + """ + Adapter maintainers should overwrite this method to return any run metadata that should be captured during a run. + """ + return {} + COLUMNS_EQUAL_SQL = """ with diff_count as ( diff --git a/dbt/adapters/base/meta.py b/dbt/adapters/base/meta.py index ca7aef9f..e522a056 100644 --- a/dbt/adapters/base/meta.py +++ b/dbt/adapters/base/meta.py @@ -92,6 +92,25 @@ def parse_list(self, func: Callable) -> Callable: available = _Available() +class available_property(property): + """ + This supports making dynamic properties (`@property`) available in the jinja context. + + We use `@available` to make methods available in the jinja context, but this mechanism relies on the method being callable. + Intuitively, we should be able to use both `@available` and `@property` to create a dynamic property that's available in the jinja context. + + Using the `@property` decorator as the inner decorator supplies `@available` with something that is not callable. + Instead of returning the method, `@property` returns the value itself, not the method that is called to create the value. + + Using the `@available` decorator as the inner decorator adds `_is_available_ = True` to the function. + However, when the `@property` decorator executes, it returns a `property` object which does not have the `_is_available_` attribute. + + This decorator solves this problem by simply adding `_is_available_ = True` as an attribute on the `property` built-in. + """ + + _is_available_ = True + + class AdapterMeta(abc.ABCMeta): _available_: FrozenSet[str] _parse_replacements_: Dict[str, Callable] diff --git a/dbt/adapters/base/relation.py b/dbt/adapters/base/relation.py index 1aab7b2f..7d4888e4 100644 --- a/dbt/adapters/base/relation.py +++ b/dbt/adapters/base/relation.py @@ -1,10 +1,12 @@ from collections.abc import Hashable from dataclasses import dataclass, field +from datetime import datetime from typing import ( Any, Dict, FrozenSet, Iterator, + List, Optional, Set, Tuple, @@ -36,6 +38,13 @@ SerializableIterable = Union[Tuple, FrozenSet] +@dataclass +class EventTimeFilter(FakeAPIObject): + field_name: str + start: Optional[datetime] = None + end: Optional[datetime] = None + + @dataclass(frozen=True, eq=False, repr=False) class BaseRelation(FakeAPIObject, Hashable): path: Path @@ -47,6 +56,7 @@ class BaseRelation(FakeAPIObject, Hashable): quote_policy: Policy = field(default_factory=lambda: Policy()) dbt_created: bool = False limit: Optional[int] = None + event_time_filter: Optional[EventTimeFilter] = None require_alias: bool = ( True # used to govern whether to add an alias when render_limited is called ) @@ -208,14 +218,19 @@ def render(self) -> str: # if there is nothing set, this will return the empty string. return ".".join(part for _, part in self._render_iterator() if part is not None) - def _render_limited_alias(self) -> str: + def _render_subquery_alias(self, namespace: str) -> str: """Some databases require an alias for subqueries (postgres, mysql) for all others we want to avoid adding an alias as it has the potential to introduce issues with the query if the user also defines an alias. """ if self.require_alias: - return f" _dbt_limit_subq_{self.table}" + return f" _dbt_{namespace}_subq_{self.table}" return "" + def _render_limited_alias( + self, + ) -> str: + return self._render_subquery_alias(namespace="limit") + def render_limited(self) -> str: rendered = self.render() if self.limit is None: @@ -225,6 +240,31 @@ def render_limited(self) -> str: else: return f"(select * from {rendered} limit {self.limit}){self._render_limited_alias()}" + def render_event_time_filtered(self, rendered: Optional[str] = None) -> str: + rendered = rendered or self.render() + if self.event_time_filter is None: + return rendered + + filter = self._render_event_time_filtered(self.event_time_filter) + if not filter: + return rendered + + return f"(select * from {rendered} where {filter}){self._render_subquery_alias(namespace='et_filter')}" + + def _render_event_time_filtered(self, event_time_filter: EventTimeFilter) -> str: + """ + Returns "" if start and end are both None + """ + filter = "" + if event_time_filter.start and event_time_filter.end: + filter = f"{event_time_filter.field_name} >= '{event_time_filter.start}' and {event_time_filter.field_name} < '{event_time_filter.end}'" + elif event_time_filter.start: + filter = f"{event_time_filter.field_name} >= '{event_time_filter.start}'" + elif event_time_filter.end: + filter = f"{event_time_filter.field_name} < '{event_time_filter.end}'" + + return filter + def quoted(self, identifier): return "{quote_char}{identifier}{quote_char}".format( quote_char=self.quote_character, @@ -240,6 +280,7 @@ def create_ephemeral_from( cls: Type[Self], relation_config: RelationConfig, limit: Optional[int] = None, + event_time_filter: Optional[EventTimeFilter] = None, ) -> Self: # Note that ephemeral models are based on the identifier, which will # point to the model's alias if one exists and otherwise fall back to @@ -250,6 +291,7 @@ def create_ephemeral_from( type=cls.CTE, identifier=identifier, limit=limit, + event_time_filter=event_time_filter, ).quote(identifier=False) @classmethod @@ -300,6 +342,16 @@ def create( ) return cls.from_dict(kwargs) + @classmethod + def scd_args(cls: Type[Self], primary_key: Union[str, List[str]], updated_at) -> List[str]: + scd_args = [] + if isinstance(primary_key, list): + scd_args.extend(primary_key) + else: + scd_args.append(primary_key) + scd_args.append(updated_at) + return scd_args + @property def can_be_renamed(self) -> bool: return self.type in self.renameable_relations @@ -315,7 +367,14 @@ def __hash__(self) -> int: return hash(self.render()) def __str__(self) -> str: - return self.render() if self.limit is None else self.render_limited() + rendered = self.render() if self.limit is None else self.render_limited() + + # Limited subquery is wrapped by the event time filter subquery, and not the other way around. + # This is because in the context of resolving limited refs, we care more about performance than reliably producing a sample of a certain size. + if self.event_time_filter: + rendered = self.render_event_time_filtered(rendered) + + return rendered @property def database(self) -> Optional[str]: @@ -483,3 +542,11 @@ def flatten(self, allow_multiple_databases: bool = False) -> "SchemaSearchMap": ) return new + + +@dataclass(frozen=True, eq=False, repr=False) +class AdapterTrackingRelationInfo(FakeAPIObject, Hashable): + adapter_name: str + base_adapter_version: str + adapter_version: str + model_adapter_details: Any diff --git a/dbt/adapters/exceptions/compilation.py b/dbt/adapters/exceptions/compilation.py index 46ca5219..d82924e3 100644 --- a/dbt/adapters/exceptions/compilation.py +++ b/dbt/adapters/exceptions/compilation.py @@ -150,8 +150,10 @@ def __init__(self, missing: List): super().__init__(msg=self.get_message()) def get_message(self) -> str: - msg = 'Snapshot target is not a snapshot table (missing "{}")'.format( - '", "'.join(self.missing) + missing = '", "'.join(self.missing) + msg = ( + f'Snapshot target is missing configured columns (missing "{missing}"). ' + "See https://docs.getdbt.com/docs/build/snapshots#snapshot-meta-fields for more information." ) return msg diff --git a/dbt/adapters/factory.py b/dbt/adapters/factory.py index b1854f67..8a018619 100644 --- a/dbt/adapters/factory.py +++ b/dbt/adapters/factory.py @@ -188,7 +188,7 @@ def get_include_paths(self, name: Optional[str]) -> List[Path]: def get_adapter_type_names(self, name: Optional[str]) -> List[str]: return [p.adapter.type() for p in self.get_adapter_plugins(name)] - def get_adapter_constraint_support(self, name: Optional[str]) -> List[str]: + def get_adapter_constraint_support(self, name: Optional[str]) -> Dict[str, str]: return self.lookup_adapter(name).CONSTRAINT_SUPPORT # type: ignore @@ -251,7 +251,7 @@ def get_adapter_type_names(name: Optional[str]) -> List[str]: return FACTORY.get_adapter_type_names(name) -def get_adapter_constraint_support(name: Optional[str]) -> List[str]: +def get_adapter_constraint_support(name: Optional[str]) -> Dict[str, str]: return FACTORY.get_adapter_constraint_support(name) diff --git a/dbt/include/global_project/macros/adapters/timestamps.sql b/dbt/include/global_project/macros/adapters/timestamps.sql index 64b5fd3d..c936c844 100644 --- a/dbt/include/global_project/macros/adapters/timestamps.sql +++ b/dbt/include/global_project/macros/adapters/timestamps.sql @@ -15,6 +15,14 @@ {{ current_timestamp() }} {% endmacro %} +{% macro get_snapshot_get_time_data_type() %} + {% set snapshot_time = adapter.dispatch('snapshot_get_time', 'dbt')() %} + {% set time_data_type_sql = 'select ' ~ snapshot_time ~ ' as dbt_snapshot_time' %} + {% set snapshot_time_column_schema = get_column_schema_from_query(time_data_type_sql) %} + {% set time_data_type = snapshot_time_column_schema[0].dtype %} + {{ return(time_data_type or none) }} +{% endmacro %} + --------------------------------------------- /* {# diff --git a/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql b/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql index f932751a..41d2de26 100644 --- a/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql +++ b/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql @@ -32,6 +32,9 @@ {% set to_drop = [] %} + {% set incremental_strategy = config.get('incremental_strategy') or 'default' %} + {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %} + {% if existing_relation is none %} {% set build_sql = get_create_table_as_sql(False, target_relation, sql) %} {% elif full_refresh_mode %} @@ -52,9 +55,7 @@ {% endif %} {#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#} - {% set incremental_strategy = config.get('incremental_strategy') or 'default' %} {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} - {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %} {% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %} {% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %} diff --git a/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql b/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql index 72082cca..111d3887 100644 --- a/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql +++ b/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql @@ -66,6 +66,19 @@ {% endmacro %} +{% macro get_incremental_microbatch_sql(arg_dict) %} + + {{ return(adapter.dispatch('get_incremental_microbatch_sql', 'dbt')(arg_dict)) }} + +{% endmacro %} + +{% macro default__get_incremental_microbatch_sql(arg_dict) %} + + {{ exceptions.raise_not_implemented('microbatch materialization strategy not implemented for adapter ' + adapter.type()) }} + +{% endmacro %} + + {% macro get_insert_into_sql(target_relation, temp_relation, dest_columns) %} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index bb71974c..b4cd7c14 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -34,7 +34,12 @@ {{ adapter.dispatch('snapshot_staging_table', 'dbt')(strategy, source_sql, target_relation) }} {% endmacro %} +{% macro get_snapshot_table_column_names() %} + {{ return({'dbt_valid_to': 'dbt_valid_to', 'dbt_valid_from': 'dbt_valid_from', 'dbt_scd_id': 'dbt_scd_id', 'dbt_updated_at': 'dbt_updated_at'}) }} +{% endmacro %} + {% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%} + {% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %} with snapshot_query as ( @@ -44,35 +49,35 @@ snapshotted_data as ( - select *, - {{ strategy.unique_key }} as dbt_unique_key - + select *, {{ unique_key_fields(strategy.unique_key) }} from {{ target_relation }} - where dbt_valid_to is null + where + {% if config.get('dbt_valid_to_current') %} + {# Check for either dbt_valid_to_current OR null, in order to correctly update records with nulls #} + ( {{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or {{ columns.dbt_valid_to }} is null) + {% else %} + {{ columns.dbt_valid_to }} is null + {% endif %} ), insertions_source_data as ( - select - *, - {{ strategy.unique_key }} as dbt_unique_key, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to, - {{ strategy.scd_id }} as dbt_scd_id + select *, {{ unique_key_fields(strategy.unique_key) }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ get_dbt_valid_to_current(strategy, columns) }}, + {{ strategy.scd_id }} as {{ columns.dbt_scd_id }} from snapshot_query ), updates_source_data as ( - select - *, - {{ strategy.unique_key }} as dbt_unique_key, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - {{ strategy.updated_at }} as dbt_valid_to + select *, {{ unique_key_fields(strategy.unique_key) }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_to }} from snapshot_query ), @@ -81,9 +86,7 @@ deletes_source_data as ( - select - *, - {{ strategy.unique_key }} as dbt_unique_key + select *, {{ unique_key_fields(strategy.unique_key) }} from snapshot_query ), {% endif %} @@ -95,13 +98,11 @@ source_data.* from insertions_source_data as source_data - left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - where snapshotted_data.dbt_unique_key is null - or ( - snapshotted_data.dbt_unique_key is not null - and ( - {{ strategy.row_changed }} - ) + left outer join snapshotted_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }} + or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and ({{ strategy.row_changed }}) + ) ), @@ -111,10 +112,11 @@ select 'update' as dbt_change_type, source_data.*, - snapshotted_data.dbt_scd_id + snapshotted_data.{{ columns.dbt_scd_id }} from updates_source_data as source_data - join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key + join snapshotted_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} where ( {{ strategy.row_changed }} ) @@ -128,14 +130,15 @@ select 'delete' as dbt_change_type, source_data.*, - {{ snapshot_get_time() }} as dbt_valid_from, - {{ snapshot_get_time() }} as dbt_updated_at, - {{ snapshot_get_time() }} as dbt_valid_to, - snapshotted_data.dbt_scd_id + {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, + {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, + {{ snapshot_get_time() }} as {{ columns.dbt_valid_to }}, + snapshotted_data.{{ columns.dbt_scd_id }} from snapshotted_data - left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - where source_data.dbt_unique_key is null + left join deletes_source_data as source_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "source_data") }} ) {%- endif %} @@ -155,12 +158,13 @@ {% endmacro %} {% macro default__build_snapshot_table(strategy, sql) %} + {% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %} select *, - {{ strategy.scd_id }} as dbt_scd_id, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to + {{ strategy.scd_id }} as {{ columns.dbt_scd_id }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ get_dbt_valid_to_current(strategy, columns) }} from ( {{ sql }} ) sbq @@ -179,3 +183,77 @@ {% do return(temp_relation) %} {% endmacro %} + + +{% macro get_updated_at_column_data_type(snapshot_sql) %} + {% set snapshot_sql_column_schema = get_column_schema_from_query(snapshot_sql) %} + {% set dbt_updated_at_data_type = null %} + {% set ns = namespace() -%} {#-- handle for-loop scoping with a namespace --#} + {% set ns.dbt_updated_at_data_type = null -%} + {% for column in snapshot_sql_column_schema %} + {% if ((column.column == 'dbt_updated_at') or (column.column == 'DBT_UPDATED_AT')) %} + {% set ns.dbt_updated_at_data_type = column.dtype %} + {% endif %} + {% endfor %} + {{ return(ns.dbt_updated_at_data_type or none) }} +{% endmacro %} + + +{% macro check_time_data_types(sql) %} + {% set dbt_updated_at_data_type = get_updated_at_column_data_type(sql) %} + {% set snapshot_get_time_data_type = get_snapshot_get_time_data_type() %} + {% if snapshot_get_time_data_type is not none and dbt_updated_at_data_type is not none and snapshot_get_time_data_type != dbt_updated_at_data_type %} + {% if exceptions.warn_snapshot_timestamp_data_types %} + {{ exceptions.warn_snapshot_timestamp_data_types(snapshot_get_time_data_type, dbt_updated_at_data_type) }} + {% endif %} + {% endif %} +{% endmacro %} + + +{% macro get_dbt_valid_to_current(strategy, columns) %} + {% set dbt_valid_to_current = config.get('dbt_valid_to_current') or "null" %} + coalesce(nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}), {{dbt_valid_to_current}}) + as {{ columns.dbt_valid_to }} +{% endmacro %} + + +{% macro unique_key_fields(unique_key) %} + {% if unique_key | is_list %} + {% for key in unique_key %} + {{ key }} as dbt_unique_key_{{ loop.index }} + {%- if not loop.last %} , {%- endif %} + {% endfor %} + {% else %} + {{ unique_key }} as dbt_unique_key + {% endif %} +{% endmacro %} + + +{% macro unique_key_join_on(unique_key, identifier, from_identifier) %} + {% if unique_key | is_list %} + {% for key in unique_key %} + {{ identifier }}.dbt_unique_key_{{ loop.index }} = {{ from_identifier }}.dbt_unique_key_{{ loop.index }} + {%- if not loop.last %} and {%- endif %} + {% endfor %} + {% else %} + {{ identifier }}.dbt_unique_key = {{ from_identifier }}.dbt_unique_key + {% endif %} +{% endmacro %} + + +{% macro unique_key_is_null(unique_key, identifier) %} + {% if unique_key | is_list %} + {{ identifier }}.dbt_unique_key_1 is null + {% else %} + {{ identifier }}.dbt_unique_key is null + {% endif %} +{% endmacro %} + + +{% macro unique_key_is_not_null(unique_key, identifier) %} + {% if unique_key | is_list %} + {{ identifier }}.dbt_unique_key_1 is not null + {% else %} + {{ identifier }}.dbt_unique_key is not null + {% endif %} +{% endmacro %} diff --git a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql index b0fe9222..0c9590b6 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql @@ -1,5 +1,4 @@ {% materialization snapshot, default %} - {%- set config = model['config'] -%} {%- set target_table = model.get('alias', model.get('name')) -%} @@ -24,37 +23,45 @@ {{ run_hooks(pre_hooks, inside_transaction=True) }} {% set strategy_macro = strategy_dispatch(strategy_name) %} - {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %} + {# The model['config'] parameter below is no longer used, but passing anyway for compatibility #} + {# It was a dictionary of config, instead of the config object from the context #} + {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", model['config'], target_relation_exists) %} {% if not target_relation_exists %} {% set build_sql = build_snapshot_table(strategy, model['compiled_code']) %} + {% set build_or_select_sql = build_sql %} {% set final_sql = create_table_as(False, target_relation, build_sql) %} {% else %} - {{ adapter.valid_snapshot_target(target_relation) }} + {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} + {{ adapter.valid_snapshot_target(target_relation, columns) }} + + {% set build_or_select_sql = snapshot_staging_table(strategy, sql, target_relation) %} {% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %} -- this may no-op if the database does not require column expansion {% do adapter.expand_target_column_types(from_relation=staging_table, to_relation=target_relation) %} + {% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %} + {% if unique_key | is_list %} + {% for key in strategy.unique_key %} + {{ remove_columns.append('dbt_unique_key_' + loop.index|string) }} + {{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }} + {% endfor %} + {% endif %} + {% set missing_columns = adapter.get_missing_columns(staging_table, target_relation) - | rejectattr('name', 'equalto', 'dbt_change_type') - | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') - | rejectattr('name', 'equalto', 'dbt_unique_key') - | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | rejectattr('name', 'in', remove_columns) | list %} {% do create_columns(target_relation, missing_columns) %} {% set source_columns = adapter.get_columns_in_relation(staging_table) - | rejectattr('name', 'equalto', 'dbt_change_type') - | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') - | rejectattr('name', 'equalto', 'dbt_unique_key') - | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | rejectattr('name', 'in', remove_columns) | list %} {% set quoted_source_columns = [] %} @@ -71,6 +78,9 @@ {% endif %} + + {{ check_time_data_types(build_or_select_sql) }} + {% call statement('main') %} {{ final_sql }} {% endcall %} diff --git a/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql b/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql index 56798811..cf787e4f 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql @@ -7,15 +7,22 @@ {% macro default__snapshot_merge_sql(target, source, insert_cols) -%} {%- set insert_cols_csv = insert_cols | join(', ') -%} + {%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%} + merge into {{ target.render() }} as DBT_INTERNAL_DEST using {{ source }} as DBT_INTERNAL_SOURCE - on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id + on DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = DBT_INTERNAL_DEST.{{ columns.dbt_scd_id }} when matched - and DBT_INTERNAL_DEST.dbt_valid_to is null + {% if config.get("dbt_valid_to_current") %} + and (DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or + DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null) + {% else %} + and DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null + {% endif %} and DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete') then update - set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to + set {{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }} when not matched and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert' diff --git a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql index d22cc336..f9f5afbd 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql @@ -49,10 +49,13 @@ {# Core strategy definitions #} -{% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} - {% set primary_key = config['unique_key'] %} - {% set updated_at = config['updated_at'] %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %} + +{% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, model_config, target_exists) %} + {# The model_config parameter is no longer used, but is passed in anyway for compatibility. #} + {% set primary_key = config.get('unique_key') %} + {% set updated_at = config.get('updated_at') %} + {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %} + {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} {#/* The snapshot relation might not have an {{ updated_at }} value if the @@ -64,10 +67,11 @@ See https://github.com/dbt-labs/dbt-core/issues/2350 */ #} {% set row_changed_expr -%} - ({{ snapshotted_rel }}.dbt_valid_from < {{ current_rel }}.{{ updated_at }}) + ({{ snapshotted_rel }}.{{ columns.dbt_valid_from }} < {{ current_rel }}.{{ updated_at }}) {%- endset %} - {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} + {% set scd_args = api.Relation.scd_args(primary_key, updated_at) %} + {% set scd_id_expr = snapshot_hash_arguments(scd_args) %} {% do return({ "unique_key": primary_key, @@ -133,11 +137,12 @@ {%- endmacro %} -{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} - {% set check_cols_config = config['check_cols'] %} - {% set primary_key = config['unique_key'] %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %} - {% set updated_at = config.get('updated_at', snapshot_get_time()) %} +{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, model_config, target_exists) %} + {# The model_config parameter is no longer used, but is passed in anyway for compatibility. #} + {% set check_cols_config = config.get('check_cols') %} + {% set primary_key = config.get('unique_key') %} + {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %} + {% set updated_at = config.get('updated_at') or snapshot_get_time() %} {% set column_added = false %} @@ -162,7 +167,8 @@ ) {%- endset %} - {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} + {% set scd_args = api.Relation.scd_args(primary_key, updated_at) %} + {% set scd_id_expr = snapshot_hash_arguments(scd_args) %} {% do return({ "unique_key": primary_key, diff --git a/pyproject.toml b/pyproject.toml index e794781c..7a8d1a50 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ name = "dbt-adapters" description = "The set of adapter protocols and base functionality that supports integration with dbt-core" readme = "README.md" keywords = ["dbt", "adapter", "adapters", "database", "elt", "dbt-core", "dbt Core", "dbt Cloud", "dbt Labs"] -requires-python = ">=3.8.0" +requires-python = ">=3.9.0" authors = [ { name = "dbt Labs", email = "info@dbtlabs.com" }, ] @@ -17,14 +17,13 @@ classifiers = [ "Operating System :: MacOS :: MacOS X", "Operating System :: Microsoft :: Windows", "Operating System :: POSIX :: Linux", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", ] dependencies = [ - "dbt-common>=1.6,<2.0", + "dbt-common>=1.11,<2.0", "pytz>=2015.7", # installed via dbt-common but used directly "agate>=1.0,<2.0", diff --git a/tests/unit/behavior_flag_tests/__init__.py b/tests/unit/behavior_flag_tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/behavior_flag_tests/test_behavior_flags.py b/tests/unit/behavior_flag_tests/test_behavior_flags.py new file mode 100644 index 00000000..7f3abb89 --- /dev/null +++ b/tests/unit/behavior_flag_tests/test_behavior_flags.py @@ -0,0 +1,66 @@ +from typing import Any, Dict, List + +from dbt_common.behavior_flags import BehaviorFlag +from dbt_common.exceptions import DbtBaseException +import pytest + + +@pytest.fixture +def flags() -> Dict[str, Any]: + return { + "unregistered_flag": True, + "default_false_user_false_flag": False, + "default_false_user_true_flag": True, + "default_true_user_false_flag": False, + "default_true_user_true_flag": True, + } + + +@pytest.fixture +def behavior_flags() -> List[BehaviorFlag]: + return [ + { + "name": "default_false_user_false_flag", + "default": False, + "docs_url": "https://docs.com", + }, + { + "name": "default_false_user_true_flag", + "default": False, + "description": "This is a false flag.", + }, + { + "name": "default_false_user_skip_flag", + "default": False, + "description": "This is a true flag.", + }, + { + "name": "default_true_user_false_flag", + "default": True, + "description": "This is fake news.", + }, + { + "name": "default_true_user_true_flag", + "default": True, + "docs_url": "https://moar.docs.com", + }, + { + "name": "default_true_user_skip_flag", + "default": True, + "description": "This is a true flag.", + }, + ] + + +def test_register_behavior_flags(adapter): + # make sure that users cannot add arbitrary flags to this collection + with pytest.raises(DbtBaseException): + assert adapter.behavior.unregistered_flag + + # check the values of the valid behavior flags + assert not adapter.behavior.default_false_user_false_flag + assert adapter.behavior.default_false_user_true_flag + assert not adapter.behavior.default_false_user_skip_flag + assert not adapter.behavior.default_true_user_false_flag + assert adapter.behavior.default_true_user_true_flag + assert adapter.behavior.default_true_user_skip_flag diff --git a/tests/unit/behavior_flag_tests/test_empty_project.py b/tests/unit/behavior_flag_tests/test_empty_project.py new file mode 100644 index 00000000..f9fd7a76 --- /dev/null +++ b/tests/unit/behavior_flag_tests/test_empty_project.py @@ -0,0 +1,87 @@ +from types import SimpleNamespace +from typing import Any, Dict, List + +from dbt_common.behavior_flags import BehaviorFlag +from dbt_common.exceptions import DbtBaseException +import pytest + +from dbt.adapters.contracts.connection import AdapterRequiredConfig, QueryComment + +from tests.unit.fixtures.credentials import CredentialsStub + + +@pytest.fixture +def flags() -> Dict[str, Any]: + return { + "unregistered_flag": True, + "default_false_user_false_flag": False, + "default_false_user_true_flag": True, + "default_true_user_false_flag": False, + "default_true_user_true_flag": True, + } + + +@pytest.fixture +def config(flags) -> AdapterRequiredConfig: + raw_config = { + "credentials": CredentialsStub("test_database", "test_schema"), + "profile_name": "test_profile", + "target_name": "test_target", + "threads": 4, + "project_name": "test_project", + "query_comment": QueryComment(), + "cli_vars": {}, + "target_path": "path/to/nowhere", + "log_cache_events": False, + } + return SimpleNamespace(**raw_config) + + +@pytest.fixture +def behavior_flags() -> List[BehaviorFlag]: + return [ + { + "name": "default_false_user_false_flag", + "default": False, + "docs_url": "https://docs.com", + }, + { + "name": "default_false_user_true_flag", + "default": False, + "description": "This is a false flag.", + }, + { + "name": "default_false_user_skip_flag", + "default": False, + "description": "This is a true flag.", + }, + { + "name": "default_true_user_false_flag", + "default": True, + "description": "This is fake news.", + }, + { + "name": "default_true_user_true_flag", + "default": True, + "docs_url": "https://moar.docs.com", + }, + { + "name": "default_true_user_skip_flag", + "default": True, + "description": "This is a true flag.", + }, + ] + + +def test_register_behavior_flags(adapter): + # make sure that users cannot add arbitrary flags to this collection + with pytest.raises(DbtBaseException): + assert adapter.behavior.unregistered_flag + + # check the values of the valid behavior flags + assert not adapter.behavior.default_false_user_false_flag + assert not adapter.behavior.default_false_user_true_flag + assert not adapter.behavior.default_false_user_skip_flag + assert adapter.behavior.default_true_user_false_flag + assert adapter.behavior.default_true_user_true_flag + assert adapter.behavior.default_true_user_skip_flag diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py new file mode 100644 index 00000000..346634df --- /dev/null +++ b/tests/unit/conftest.py @@ -0,0 +1 @@ +from tests.unit.fixtures import adapter, behavior_flags, config, flags diff --git a/tests/unit/fixtures/__init__.py b/tests/unit/fixtures/__init__.py new file mode 100644 index 00000000..78135a2c --- /dev/null +++ b/tests/unit/fixtures/__init__.py @@ -0,0 +1 @@ +from tests.unit.fixtures.adapter import adapter, behavior_flags, config, flags diff --git a/tests/unit/fixtures/adapter.py b/tests/unit/fixtures/adapter.py new file mode 100644 index 00000000..b59b0423 --- /dev/null +++ b/tests/unit/fixtures/adapter.py @@ -0,0 +1,146 @@ +from multiprocessing import get_context +from types import SimpleNamespace +from typing import Any, Dict, List + +import agate +from dbt_common.behavior_flags import BehaviorFlag +import pytest + +from dbt.adapters.base.column import Column +from dbt.adapters.base.impl import BaseAdapter +from dbt.adapters.base.relation import BaseRelation +from dbt.adapters.contracts.connection import AdapterRequiredConfig, QueryComment + +from tests.unit.fixtures.connection_manager import ConnectionManagerStub +from tests.unit.fixtures.credentials import CredentialsStub + + +@pytest.fixture +def adapter(config, behavior_flags) -> BaseAdapter: + + class BaseAdapterStub(BaseAdapter): + """ + A stub for an adapter that uses the cache as the database + """ + + ConnectionManager = ConnectionManagerStub + + @property + def _behavior_flags(self) -> List[BehaviorFlag]: + return behavior_flags + + ### + # Abstract methods for database-specific values, attributes, and types + ### + @classmethod + def date_function(cls) -> str: + return "date_function" + + @classmethod + def is_cancelable(cls) -> bool: + return False + + def list_schemas(self, database: str) -> List[str]: + return list(self.cache.schemas) + + ### + # Abstract methods about relations + ### + def drop_relation(self, relation: BaseRelation) -> None: + self.cache_dropped(relation) + + def truncate_relation(self, relation: BaseRelation) -> None: + self.cache_dropped(relation) + + def rename_relation(self, from_relation: BaseRelation, to_relation: BaseRelation) -> None: + self.cache_renamed(from_relation, to_relation) + + def get_columns_in_relation(self, relation: BaseRelation) -> List[Column]: + # there's no database, so these need to be added as kwargs in the existing_relations fixture + return relation.columns + + def expand_column_types(self, goal: BaseRelation, current: BaseRelation) -> None: + # there's no database, so these need to be added as kwargs in the existing_relations fixture + object.__setattr__(current, "columns", goal.columns) + + def list_relations_without_caching( + self, schema_relation: BaseRelation + ) -> List[BaseRelation]: + # there's no database, so use the cache as the database + return self.cache.get_relations(schema_relation.database, schema_relation.schema) + + ### + # ODBC FUNCTIONS -- these should not need to change for every adapter, + # although some adapters may override them + ### + def create_schema(self, relation: BaseRelation): + # there's no database, this happens implicitly by adding a relation to the cache + pass + + def drop_schema(self, relation: BaseRelation): + for each_relation in self.cache.get_relations(relation.database, relation.schema): + self.cache_dropped(each_relation) + + @classmethod + def quote(cls, identifier: str) -> str: + quote_char = "" + return f"{quote_char}{identifier}{quote_char}" + + ### + # Conversions: These must be implemented by concrete implementations, for + # converting agate types into their sql equivalents. + ### + @classmethod + def convert_text_type(cls, agate_table: agate.Table, col_idx: int) -> str: + return "str" + + @classmethod + def convert_number_type(cls, agate_table: agate.Table, col_idx: int) -> str: + return "float" + + @classmethod + def convert_boolean_type(cls, agate_table: agate.Table, col_idx: int) -> str: + return "bool" + + @classmethod + def convert_datetime_type(cls, agate_table: agate.Table, col_idx: int) -> str: + return "datetime" + + @classmethod + def convert_date_type(cls, *args, **kwargs): + return "date" + + @classmethod + def convert_time_type(cls, *args, **kwargs): + return "time" + + return BaseAdapterStub(config, get_context("spawn")) + + +@pytest.fixture +def config(flags) -> AdapterRequiredConfig: + raw_config = { + "credentials": CredentialsStub("test_database", "test_schema"), + "profile_name": "test_profile", + "target_name": "test_target", + "threads": 4, + "project_name": "test_project", + "query_comment": QueryComment(), + "cli_vars": {}, + "target_path": "path/to/nowhere", + "log_cache_events": False, + "flags": flags, + } + return SimpleNamespace(**raw_config) + + +@pytest.fixture +def flags() -> Dict[str, Any]: + # this is the flags collection in dbt_project.yaml + return {} + + +@pytest.fixture +def behavior_flags() -> List[BehaviorFlag]: + # this is the collection of behavior flags for a specific adapter + return [] diff --git a/tests/unit/fixtures/connection_manager.py b/tests/unit/fixtures/connection_manager.py new file mode 100644 index 00000000..8b353fbe --- /dev/null +++ b/tests/unit/fixtures/connection_manager.py @@ -0,0 +1,58 @@ +from contextlib import contextmanager +from typing import ContextManager, List, Optional, Tuple + +import agate + +from dbt.adapters.base.connections import BaseConnectionManager +from dbt.adapters.contracts.connection import AdapterResponse, Connection, ConnectionState + + +class ConnectionManagerStub(BaseConnectionManager): + """ + A stub for a connection manager that does not connect to a database + """ + + raised_exceptions: List[Exception] + + @contextmanager + def exception_handler(self, sql: str) -> ContextManager: # type: ignore + # catch all exceptions and put them on this class for inspection in tests + try: + yield + except Exception as exc: + self.raised_exceptions.append(exc) + finally: + pass + + def cancel_open(self) -> Optional[List[str]]: + names = [] + for connection in self.thread_connections.values(): + if connection.state == ConnectionState.OPEN: + connection.state = ConnectionState.CLOSED + if name := connection.name: + names.append(name) + return names + + @classmethod + def open(cls, connection: Connection) -> Connection: + # there's no database, so just change the state + connection.state = ConnectionState.OPEN + return connection + + def begin(self) -> None: + # there's no database, so there are no transactions + pass + + def commit(self) -> None: + # there's no database, so there are no transactions + pass + + def execute( + self, + sql: str, + auto_begin: bool = False, + fetch: bool = False, + limit: Optional[int] = None, + ) -> Tuple[AdapterResponse, agate.Table]: + # there's no database, so just return the sql + return AdapterResponse(_message="", code=sql), agate.Table([]) diff --git a/tests/unit/fixtures/credentials.py b/tests/unit/fixtures/credentials.py new file mode 100644 index 00000000..88817f6b --- /dev/null +++ b/tests/unit/fixtures/credentials.py @@ -0,0 +1,13 @@ +from dbt.adapters.contracts.connection import Credentials + + +class CredentialsStub(Credentials): + """ + A stub for a database credentials that does not connect to a database + """ + + def type(self) -> str: + return "test" + + def _connection_keys(self): + return {"database": self.database, "schema": self.schema} diff --git a/tests/unit/test_adapter_telemetry.py b/tests/unit/test_adapter_telemetry.py new file mode 100644 index 00000000..1d5c4911 --- /dev/null +++ b/tests/unit/test_adapter_telemetry.py @@ -0,0 +1,15 @@ +import dbt.adapters.__about__ + +from dbt.adapters.base.impl import BaseAdapter +from dbt.adapters.base.relation import AdapterTrackingRelationInfo + + +def test_telemetry_returns(): + res = BaseAdapter.get_adapter_run_info({}) + + assert res.adapter_name == "base" + assert res.base_adapter_version == dbt.adapters.__about__.version + assert res.adapter_version == "" + assert res.model_adapter_details == {} + + assert type(res) is AdapterTrackingRelationInfo diff --git a/tests/unit/test_relation.py b/tests/unit/test_relation.py index 97d56419..6d835e0d 100644 --- a/tests/unit/test_relation.py +++ b/tests/unit/test_relation.py @@ -1,8 +1,9 @@ from dataclasses import dataclass, replace - +from datetime import datetime import pytest from dbt.adapters.base import BaseRelation +from dbt.adapters.base.relation import EventTimeFilter from dbt.adapters.contracts.relation import RelationType @@ -81,6 +82,80 @@ def test_render_limited(limit, require_alias, expected_result): assert str(my_relation) == expected_result +@pytest.mark.parametrize( + "event_time_filter,require_alias,expected_result", + [ + (None, False, '"test_database"."test_schema"."test_identifier"'), + ( + EventTimeFilter(field_name="column"), + False, + '"test_database"."test_schema"."test_identifier"', + ), + (None, True, '"test_database"."test_schema"."test_identifier"'), + ( + EventTimeFilter(field_name="column"), + True, + '"test_database"."test_schema"."test_identifier"', + ), + ( + EventTimeFilter(field_name="column", start=datetime(year=2020, month=1, day=1)), + False, + """(select * from "test_database"."test_schema"."test_identifier" where column >= '2020-01-01 00:00:00')""", + ), + ( + EventTimeFilter(field_name="column", start=datetime(year=2020, month=1, day=1)), + True, + """(select * from "test_database"."test_schema"."test_identifier" where column >= '2020-01-01 00:00:00') _dbt_et_filter_subq_test_identifier""", + ), + ( + EventTimeFilter(field_name="column", end=datetime(year=2020, month=1, day=1)), + False, + """(select * from "test_database"."test_schema"."test_identifier" where column < '2020-01-01 00:00:00')""", + ), + ( + EventTimeFilter( + field_name="column", + start=datetime(year=2020, month=1, day=1), + end=datetime(year=2020, month=1, day=2), + ), + False, + """(select * from "test_database"."test_schema"."test_identifier" where column >= '2020-01-01 00:00:00' and column < '2020-01-02 00:00:00')""", + ), + ], +) +def test_render_event_time_filtered(event_time_filter, require_alias, expected_result): + my_relation = BaseRelation.create( + database="test_database", + schema="test_schema", + identifier="test_identifier", + event_time_filter=event_time_filter, + require_alias=require_alias, + ) + actual_result = my_relation.render_event_time_filtered() + assert actual_result == expected_result + assert str(my_relation) == expected_result + + +def test_render_event_time_filtered_and_limited(): + my_relation = BaseRelation.create( + database="test_database", + schema="test_schema", + identifier="test_identifier", + event_time_filter=EventTimeFilter( + field_name="column", + start=datetime(year=2020, month=1, day=1), + end=datetime(year=2020, month=1, day=2), + ), + limit=0, + require_alias=False, + ) + expected_result = """(select * from (select * from "test_database"."test_schema"."test_identifier" where false limit 0) where column >= '2020-01-01 00:00:00' and column < '2020-01-02 00:00:00')""" + + actual_result = my_relation.render_event_time_filtered(my_relation.render_limited()) + assert actual_result == expected_result + assert str(my_relation) == expected_result + + def test_create_ephemeral_from_uses_identifier(): @dataclass class Node: