From c4930e9a512576c714ba7c6a404a8ea99e86bff5 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 22 Aug 2024 16:33:33 -0400 Subject: [PATCH 01/38] initial rough-in with CLI flags --- core/dbt/artifacts/resources/v1/config.py | 2 + core/dbt/cli/main.py | 4 + core/dbt/cli/params.py | 18 +++++ core/dbt/context/providers.py | 52 +++++++++++-- pairing.md | 20 +++++ .../functional/microbatch/test_microbatch.py | 76 +++++++++++++++++++ 6 files changed, 167 insertions(+), 5 deletions(-) create mode 100644 pairing.md create mode 100644 tests/functional/microbatch/test_microbatch.py diff --git a/core/dbt/artifacts/resources/v1/config.py b/core/dbt/artifacts/resources/v1/config.py index fc9be8f7c70..050b37cff94 100644 --- a/core/dbt/artifacts/resources/v1/config.py +++ b/core/dbt/artifacts/resources/v1/config.py @@ -122,6 +122,8 @@ class NodeConfig(NodeAndTestConfig): default_factory=ContractConfig, metadata=MergeBehavior.Update.meta(), ) + # TODO: does this need to be Any? + event_time: Optional[str] = None def __post_init__(self): # we validate that node_color has a suitable value to prevent dbt-docs from crashing diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index ce0a42aae2e..41bd44f2682 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -539,6 +539,8 @@ def parse(ctx, **kwargs): @p.profiles_dir @p.project_dir @p.empty +@p.event_time_start +@p.event_time_end @p.select @p.selector @p.target_path @@ -790,6 +792,8 @@ def freshness(ctx, **kwargs): @p.store_failures @p.target_path @p.threads +# @p.event_time_start +# @p.event_time_end @p.vars @requires.postflight @requires.preflight diff --git a/core/dbt/cli/params.py b/core/dbt/cli/params.py index eb8e5594f76..7674798475d 100644 --- a/core/dbt/cli/params.py +++ b/core/dbt/cli/params.py @@ -91,6 +91,24 @@ is_flag=True, ) +event_time_end = click.option( + "--event-time-end", + envvar="DBT_EVENT_TIME_END", + # TODO: improve help text + help="upper bound to filter refs on", + type=click.STRING, + default=None, +) + +event_time_start = click.option( + "--event-time-start", + envvar="DBT_EVENT_TIME_START", + # TODO: improve help text + help="lower bound to filter refs on", + type=click.STRING, + default=None, +) + exclude = click.option( "--exclude", envvar=None, diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index d9010a9f4f9..9f3576e73fc 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -1,6 +1,7 @@ import abc import os from copy import deepcopy +from datetime import datetime from typing import ( TYPE_CHECKING, Any, @@ -20,6 +21,7 @@ from dbt import selected_resources from dbt.adapters.base.column import Column +from dbt.adapters.base.relation import EventTimeFilter from dbt.adapters.contracts.connection import AdapterResponse from dbt.adapters.exceptions import MissingConfigError from dbt.adapters.factory import ( @@ -27,7 +29,7 @@ get_adapter_package_names, get_adapter_type_names, ) -from dbt.artifacts.resources import NodeVersion, RefArgs +from dbt.artifacts.resources import NodeConfig, NodeVersion, RefArgs from dbt.clients.jinja import ( MacroGenerator, MacroStack, @@ -230,6 +232,29 @@ def Relation(self): def resolve_limit(self) -> Optional[int]: return 0 if getattr(self.config.args, "EMPTY", False) else None + def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]: + event_time_filter = None + if ( + isinstance(target.config, NodeConfig) + and target.config.event_time + and self.model.config.materialized == "incremental" + and self.model.config.get("strategy") == "microbatch" + ): + start = getattr(self.config.args, "EVENT_TIME_START", None) + start = datetime.strptime(start, "%Y-%m-%d") if start else None + + end = getattr(self.config.args, "EVENT_TIME_END", None) + end = datetime.strptime(end, "%Y-%m-%d") if end else None + + if start is not None or end is not None: + event_time_filter = EventTimeFilter( + field_name=target.config.event_time, + start=start, + end=end, + ) + + return event_time_filter + @abc.abstractmethod def __call__(self, *args: str) -> Union[str, RelationProxy, MetricReference]: pass @@ -545,7 +570,11 @@ def resolve( def create_relation(self, target_model: ManifestNode) -> RelationProxy: if target_model.is_ephemeral_model: self.model.set_cte(target_model.unique_id, None) - return self.Relation.create_ephemeral_from(target_model, limit=self.resolve_limit) + return self.Relation.create_ephemeral_from( + target_model, + limit=self.resolve_limit, + event_time_filter=self.resolve_event_time_filter(target_model), + ) elif ( hasattr(target_model, "defer_relation") and target_model.defer_relation @@ -563,10 +592,18 @@ def create_relation(self, target_model: ManifestNode) -> RelationProxy: ) ): return self.Relation.create_from( - self.config, target_model.defer_relation, limit=self.resolve_limit + self.config, + target_model.defer_relation, + limit=self.resolve_limit, + event_time_filter=self.resolve_event_time_filter(target_model), ) else: - return self.Relation.create_from(self.config, target_model, limit=self.resolve_limit) + return self.Relation.create_from( + self.config, + target_model, + limit=self.resolve_limit, + event_time_filter=self.resolve_event_time_filter(target_model), + ) def validate( self, @@ -633,7 +670,12 @@ def resolve(self, source_name: str, table_name: str): target_kind="source", disabled=(isinstance(target_source, Disabled)), ) - return self.Relation.create_from(self.config, target_source, limit=self.resolve_limit) + return self.Relation.create_from( + self.config, + target_source, + limit=self.resolve_limit, + event_time_filter=self.resolve_event_time_filter(target_source), + ) class RuntimeUnitTestSourceResolver(BaseSourceResolver): diff --git a/pairing.md b/pairing.md new file mode 100644 index 00000000000..c33904047f6 --- /dev/null +++ b/pairing.md @@ -0,0 +1,20 @@ +### Requirements + +- [p0*] It is possible to configure a top-level event_time key as a top-level property of a model + - TODO: consider backward compatability; perhaps call it an Any and do more aggressive validation + skipping if not str + +- [p0*] It is possible to hard-core a lower and upper bound time window to apply to all microbatch runs within an invocation via CLI flags (--event-start-time, --event-end-time) + - default: open on the left, closed on the right + - 1 < x <= 2 is open on the left, closed on the right + - so t=1, t=5 → update [2,3,4,5] + +- It is possible to *automatically* read (via `ref` and `source`) just the “new” data for inputs with `event_time` defined in the context of a microbatch model + - [p0*] “New” data is defined by dynamic checkpoints: current_timestamp as upper bound, lower bound as a partition-aware offset of that + +- [p0*] It is possible to configure a “lookback period” that applies to the read window of a microbatch model. + +- [p0*] It is possible to efficiently *write* entire partitions representing the newly computed data for a given microbatch model run. + - https://docs.getdbt.com/docs/build/incremental-strategy + - Target warehouses: + - [p0*]`insert_overwrite` dbt-bigquery, dbt-spark, dbt-databricks + - [p0*]`delete+insert` dbt-snowflake diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py new file mode 100644 index 00000000000..fbccf772c87 --- /dev/null +++ b/tests/functional/microbatch/test_microbatch.py @@ -0,0 +1,76 @@ +import pytest + +from dbt.tests.util import relation_from_name, run_dbt + +input_model_sql = """ +{{ config(event_time='event_time') }} + +select 1 as id, DATE '2020-01-01' as event_time, 'invalid' as status +union all +select 2 as id, DATE '2020-01-02' as event_time, 'success' as status +union all +select 3 as id, DATE '2020-01-03' as event_time, 'failed' as status +""" + +microbatch_model_sql = """ +{{ config(materialized='incremental', strategy='microbatch', event_time='event_time') }} +select * from {{ ref('input_model') }} +""" + +microbatch_model_yml = """ +models: + - name: microbatch_model + columns: + - name: status + tests: + - accepted_values: + values: ['success', 'failed'] +""" + + +class TestMicrobatch: + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_sql, + "microbatch.yml": microbatch_model_yml, + } + + def assert_row_count(self, project, relation_name: str, expected_row_count: int): + relation = relation_from_name(project.adapter, relation_name) + result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") + + if result[0] != expected_row_count: + # running show for debugging + run_dbt(["show", "--inline", f"select * from {relation}"]) + + assert result[0] == expected_row_count + + def test_run_with_event_time(self, project): + # run without --event-time-start or --event-time-end - 3 expected rows in output + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # build model >= 2020-01-02 + run_dbt(["run", "--event-time-start", "2020-01-02", "--full-refresh"]) + self.assert_row_count(project, "microbatch_model", 2) + + # build model < 2020-01-03 + run_dbt(["run", "--event-time-end", "2020-01-03", "--full-refresh"]) + self.assert_row_count(project, "microbatch_model", 2) + + # build model between 2020-01-02 >= event_time < 2020-01-03 + run_dbt( + [ + "run", + "--event-time-start", + "2020-01-02", + "--event-time-end", + "2020-01-03", + "--full-refresh", + ] + ) + self.assert_row_count(project, "microbatch_model", 1) + + # results = run_dbt(["test", "--select", "microbatch_model", "--event-time-start", "2020-05-01", "--event-time-end", "2020-05-03"]) From 4c8528b8f370fe665a83bd04d996a0892da7beed Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 22 Aug 2024 16:36:17 -0400 Subject: [PATCH 02/38] dbt-adapters testing against event-time-ref-filtering --- dev-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 20605e632b8..c4746851761 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,4 +1,4 @@ -git+https://github.com/dbt-labs/dbt-adapters.git@main +git+https://github.com/dbt-labs/dbt-adapters.git@event-time-ref-filtering git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git@main git+https://github.com/dbt-labs/dbt-postgres.git@main From f5d5bb6fcebc0189a3407cf3db6c10553375f82f Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 3 Sep 2024 15:36:08 -0400 Subject: [PATCH 03/38] fix TestList --- tests/functional/list/test_list.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/functional/list/test_list.py b/tests/functional/list/test_list.py index 145b4e58cb9..ab641782ad9 100644 --- a/tests/functional/list/test_list.py +++ b/tests/functional/list/test_list.py @@ -79,6 +79,7 @@ def expect_snapshot_output(self, happy_path_project): # noqa: F811 "incremental_strategy": None, "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, + "event_time": None, }, "unique_id": "snapshot.test.my_snapshot", "original_file_path": normalize("snapshots/snapshot.sql"), @@ -121,6 +122,7 @@ def expect_analyses_output(self): "incremental_strategy": None, "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, + "event_time": None, }, "unique_id": "analysis.test.a", "original_file_path": normalize("analyses/a.sql"), @@ -182,6 +184,7 @@ def expect_model_output(self): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "access": "protected", + "event_time": None, }, "original_file_path": normalize("models/ephemeral.sql"), "unique_id": "model.test.ephemeral", @@ -220,6 +223,7 @@ def expect_model_output(self): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "access": "protected", + "event_time": None, }, "original_file_path": normalize("models/incremental.sql"), "unique_id": "model.test.incremental", @@ -258,6 +262,7 @@ def expect_model_output(self): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "access": "protected", + "event_time": None, }, "original_file_path": normalize("models/sub/inner.sql"), "unique_id": "model.test.inner", @@ -296,6 +301,7 @@ def expect_model_output(self): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "access": "protected", + "event_time": None, }, "original_file_path": normalize("models/metricflow_time_spine.sql"), "unique_id": "model.test.metricflow_time_spine", @@ -334,6 +340,7 @@ def expect_model_output(self): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "access": "protected", + "event_time": None, }, "original_file_path": normalize("models/metricflow_time_spine_second.sql"), "unique_id": "model.test.metricflow_time_spine_second", @@ -372,6 +379,7 @@ def expect_model_output(self): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "access": "protected", + "event_time": None, }, "original_file_path": normalize("models/outer.sql"), "unique_id": "model.test.outer", @@ -490,6 +498,7 @@ def expect_seed_output(self): "incremental_strategy": None, "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, + "event_time": None, }, "depends_on": {"macros": []}, "unique_id": "seed.test.seed", From 19ad7c6a0735b0e6326e308607b3944d76eb5fba Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 3 Sep 2024 16:01:29 -0500 Subject: [PATCH 04/38] Checkpoint --- core/dbt/artifacts/resources/types.py | 7 ++ core/dbt/artifacts/resources/v1/config.py | 4 +- core/dbt/context/providers.py | 90 +++++++++++++++++-- pairing.md | 3 +- .../functional/microbatch/test_microbatch.py | 54 +++++++---- 5 files changed, 133 insertions(+), 25 deletions(-) diff --git a/core/dbt/artifacts/resources/types.py b/core/dbt/artifacts/resources/types.py index c0ab5341e4c..737a123f136 100644 --- a/core/dbt/artifacts/resources/types.py +++ b/core/dbt/artifacts/resources/types.py @@ -68,3 +68,10 @@ class TimePeriod(StrEnum): def plural(self) -> str: return str(self) + "s" + + +class PartitionGrain(StrEnum): + hour = "hour" + day = "day" + month = "month" + year = "year" diff --git a/core/dbt/artifacts/resources/v1/config.py b/core/dbt/artifacts/resources/v1/config.py index 050b37cff94..9ee848a88a5 100644 --- a/core/dbt/artifacts/resources/v1/config.py +++ b/core/dbt/artifacts/resources/v1/config.py @@ -7,7 +7,7 @@ from dbt import hooks from dbt.artifacts.resources.base import Docs -from dbt.artifacts.resources.types import ModelHookType +from dbt.artifacts.resources.types import ModelHookType, PartitionGrain from dbt.artifacts.utils.validation import validate_color from dbt_common.contracts.config.base import BaseConfig, CompareBehavior, MergeBehavior from dbt_common.contracts.config.materialization import OnConfigurationChangeOption @@ -80,6 +80,8 @@ class NodeConfig(NodeAndTestConfig): # 'mergebehavior' dictionary materialized: str = "view" incremental_strategy: Optional[str] = None + partition_grain: Optional[PartitionGrain] = None + partition_lookback: int = 0 persist_docs: Dict[str, Any] = field(default_factory=dict) post_hook: List[Hook] = field( default_factory=list, diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index 9f3576e73fc..c381f931160 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -1,7 +1,7 @@ import abc import os from copy import deepcopy -from datetime import datetime +from datetime import datetime, timedelta from typing import ( TYPE_CHECKING, Any, @@ -17,6 +17,7 @@ Union, ) +import pytz from typing_extensions import Protocol from dbt import selected_resources @@ -30,6 +31,7 @@ get_adapter_type_names, ) from dbt.artifacts.resources import NodeConfig, NodeVersion, RefArgs +from dbt.artifacts.resources.types import PartitionGrain from dbt.clients.jinja import ( MacroGenerator, MacroStack, @@ -232,19 +234,90 @@ def Relation(self): def resolve_limit(self) -> Optional[int]: return 0 if getattr(self.config.args, "EMPTY", False) else None + def _build_end_time(self, is_incremental: bool) -> Optional[datetime]: + if not is_incremental: + return None + else: + return datetime.now(tz=pytz.utc) + + def _build_start_time( + self, checkpoint: Optional[datetime], is_incremental: bool + ) -> Optional[datetime]: + if not is_incremental or checkpoint is None: + return None + + assert isinstance(self.model.config, NodeConfig) + grain = self.model.config.partition_grain + if grain is None: + # TODO: Better error message + raise DbtRuntimeError("Partition grain not specified") + + lookback = self.model.config.partition_lookback + if grain == PartitionGrain.hour: + start = datetime( + checkpoint.year, + checkpoint.month, + checkpoint.day, + checkpoint.hour, + 0, + 0, + 0, + pytz.utc, + ) - timedelta(hours=lookback) + elif grain == PartitionGrain.day: + start = datetime( + checkpoint.year, checkpoint.month, checkpoint.day, 0, 0, 0, 0, pytz.utc + ) - timedelta(days=lookback) + elif grain == PartitionGrain.month: + start = datetime(checkpoint.year, checkpoint.month, 1, 0, 0, 0, 0, pytz.utc) + for _ in range(lookback): + start = start - timedelta(days=1) + start = datetime(start.year, start.month, 1, 0, 0, 0, 0, pytz.utc) + elif grain == PartitionGrain.year: + start = datetime(checkpoint.year - lookback, 1, 1, 0, 0, 0, 0, pytz.utc) + else: + # TODO: Better error message + raise DbtInternalError("This should be impossible :eeek:") + + return start + + def _is_incremental(self) -> bool: + relation_info = self.Relation.create_from(self.config, self.model) + relation = self.db_wrapper.get_relation( + relation_info.database, relation_info.schema, relation_info.name + ) + return ( + relation is not None + and relation.type == "table" + and self.model.config.materialized == "incremental" + and not ( + getattr(self.config.args, "FULL_REFRESH", False) or self.model.config.full_refresh + ) + ) + def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]: event_time_filter = None if ( isinstance(target.config, NodeConfig) and target.config.event_time and self.model.config.materialized == "incremental" - and self.model.config.get("strategy") == "microbatch" + # and self.model.config.incremental_strategy == "microbatch" ): - start = getattr(self.config.args, "EVENT_TIME_START", None) - start = datetime.strptime(start, "%Y-%m-%d") if start else None - + is_incremental = self._is_incremental() + print(f"####------#### is_incremental: {is_incremental}") end = getattr(self.config.args, "EVENT_TIME_END", None) - end = datetime.strptime(end, "%Y-%m-%d") if end else None + end = ( + datetime.strptime(end, "%Y-%m-%d") + if end + else self._build_end_time(is_incremental=is_incremental) + ) + + start = getattr(self.config.args, "EVENT_TIME_START", None) + start = ( + datetime.strptime(start, "%Y-%m-%d") + if start + else self._build_start_time(checkpoint=end, is_incremental=is_incremental) + ) if start is not None or end is not None: event_time_filter = EventTimeFilter( @@ -252,7 +325,10 @@ def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeF start=start, end=end, ) - + if event_time_filter is not None: + print(event_time_filter.render()) + else: + print("No filter present! Oh no!") return event_time_filter @abc.abstractmethod diff --git a/pairing.md b/pairing.md index c33904047f6..0302bfd8e2b 100644 --- a/pairing.md +++ b/pairing.md @@ -3,10 +3,11 @@ - [p0*] It is possible to configure a top-level event_time key as a top-level property of a model - TODO: consider backward compatability; perhaps call it an Any and do more aggressive validation + skipping if not str -- [p0*] It is possible to hard-core a lower and upper bound time window to apply to all microbatch runs within an invocation via CLI flags (--event-start-time, --event-end-time) +- [p0*] It is possible to hard-code a lower and upper bound time window to apply to all microbatch runs within an invocation via CLI flags (--event-start-time, --event-end-time) - default: open on the left, closed on the right - 1 < x <= 2 is open on the left, closed on the right - so t=1, t=5 → update [2,3,4,5] + - TODO: consider a custom click type for parsing datetimes from the command line similar to run error options - It is possible to *automatically* read (via `ref` and `source`) just the “new” data for inputs with `event_time` defined in the context of a microbatch model - [p0*] “New” data is defined by dynamic checkpoints: current_timestamp as upper bound, lower bound as a partition-aware offset of that diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index fbccf772c87..bb36a5f08b9 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -1,40 +1,30 @@ import pytest +from freezegun import freeze_time from dbt.tests.util import relation_from_name, run_dbt input_model_sql = """ {{ config(event_time='event_time') }} -select 1 as id, DATE '2020-01-01' as event_time, 'invalid' as status +select 1 as id, DATE '2020-01-01' as event_time union all -select 2 as id, DATE '2020-01-02' as event_time, 'success' as status +select 2 as id, DATE '2020-01-02' as event_time union all -select 3 as id, DATE '2020-01-03' as event_time, 'failed' as status +select 3 as id, DATE '2020-01-03' as event_time """ microbatch_model_sql = """ -{{ config(materialized='incremental', strategy='microbatch', event_time='event_time') }} +{{ config(materialized='incremental', event_time='event_time', partition_grain='day') }} select * from {{ ref('input_model') }} """ -microbatch_model_yml = """ -models: - - name: microbatch_model - columns: - - name: status - tests: - - accepted_values: - values: ['success', 'failed'] -""" - -class TestMicrobatch: +class TestMicrobatchCLI: @pytest.fixture(scope="class") def models(self): return { "input_model.sql": input_model_sql, "microbatch_model.sql": microbatch_model_sql, - "microbatch.yml": microbatch_model_yml, } def assert_row_count(self, project, relation_name: str, expected_row_count: int): @@ -74,3 +64,35 @@ def test_run_with_event_time(self, project): self.assert_row_count(project, "microbatch_model", 1) # results = run_dbt(["test", "--select", "microbatch_model", "--event-time-start", "2020-05-01", "--event-time-end", "2020-05-03"]) + + +class TestMicroBatchBoundsDefault: + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_sql, + } + + def assert_row_count(self, project, relation_name: str, expected_row_count: int): + relation = relation_from_name(project.adapter, relation_name) + result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") + + if result[0] != expected_row_count: + # running show for debugging + run_dbt(["show", "--inline", f"select * from {relation}"]) + + assert result[0] == expected_row_count + + def test_run_with_event_time(self, project): + # initial run + with freeze_time("2020-01-01 13:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 1) + + # our partition grain is "day" so running the same day without new data should produce the same results + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # results = run_dbt(["test", "--select", "microbatch_model", "--event-time-start", "2020-05-01", "--event-time-end", "2020-05-03"]) From a57481f17e67597b2e86b982200fd09eaae17f2d Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 3 Sep 2024 19:37:34 -0400 Subject: [PATCH 05/38] fix tests --- core/dbt/context/providers.py | 11 +++-- .../functional/artifacts/expected_manifest.py | 9 +++++ tests/functional/list/test_list.py | 18 +++++++++ .../functional/microbatch/test_microbatch.py | 40 ++++++++++++++----- tests/unit/contracts/graph/test_nodes.py | 1 + .../unit/contracts/graph/test_nodes_parsed.py | 13 ++++++ 6 files changed, 78 insertions(+), 14 deletions(-) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index c381f931160..c1ba52982e7 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -325,10 +325,13 @@ def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeF start=start, end=end, ) - if event_time_filter is not None: - print(event_time_filter.render()) - else: - print("No filter present! Oh no!") + + # Microbatch debugging + # if event_time_filter is not None: + # print(event_time_filter.render()) + # else: + # print("No filter present! Oh no!") + return event_time_filter @abc.abstractmethod diff --git a/tests/functional/artifacts/expected_manifest.py b/tests/functional/artifacts/expected_manifest.py index 7098a4f4fea..7844d17cf02 100644 --- a/tests/functional/artifacts/expected_manifest.py +++ b/tests/functional/artifacts/expected_manifest.py @@ -39,6 +39,9 @@ def get_rendered_model_config(**updates): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "access": "protected", + "event_time": None, + "partition_lookback": 0, + "partition_grain": None, } result.update(updates) return result @@ -74,6 +77,9 @@ def get_rendered_seed_config(**updates): "incremental_strategy": None, "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, + "event_time": None, + "partition_lookback": 0, + "partition_grain": None, } result.update(updates) return result @@ -114,6 +120,9 @@ def get_rendered_snapshot_config(**updates): "incremental_strategy": None, "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, + "event_time": None, + "partition_lookback": 0, + "partition_grain": None, } result.update(updates) return result diff --git a/tests/functional/list/test_list.py b/tests/functional/list/test_list.py index ab641782ad9..341f20ddded 100644 --- a/tests/functional/list/test_list.py +++ b/tests/functional/list/test_list.py @@ -80,6 +80,8 @@ def expect_snapshot_output(self, happy_path_project): # noqa: F811 "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "event_time": None, + "partition_lookback": 0, + "partition_grain": None, }, "unique_id": "snapshot.test.my_snapshot", "original_file_path": normalize("snapshots/snapshot.sql"), @@ -123,6 +125,8 @@ def expect_analyses_output(self): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "event_time": None, + "partition_lookback": 0, + "partition_grain": None, }, "unique_id": "analysis.test.a", "original_file_path": normalize("analyses/a.sql"), @@ -185,6 +189,8 @@ def expect_model_output(self): "contract": {"enforced": False, "alias_types": True}, "access": "protected", "event_time": None, + "partition_lookback": 0, + "partition_grain": None, }, "original_file_path": normalize("models/ephemeral.sql"), "unique_id": "model.test.ephemeral", @@ -224,6 +230,8 @@ def expect_model_output(self): "contract": {"enforced": False, "alias_types": True}, "access": "protected", "event_time": None, + "partition_lookback": 0, + "partition_grain": None, }, "original_file_path": normalize("models/incremental.sql"), "unique_id": "model.test.incremental", @@ -263,6 +271,8 @@ def expect_model_output(self): "contract": {"enforced": False, "alias_types": True}, "access": "protected", "event_time": None, + "partition_lookback": 0, + "partition_grain": None, }, "original_file_path": normalize("models/sub/inner.sql"), "unique_id": "model.test.inner", @@ -302,6 +312,8 @@ def expect_model_output(self): "contract": {"enforced": False, "alias_types": True}, "access": "protected", "event_time": None, + "partition_lookback": 0, + "partition_grain": None, }, "original_file_path": normalize("models/metricflow_time_spine.sql"), "unique_id": "model.test.metricflow_time_spine", @@ -341,6 +353,8 @@ def expect_model_output(self): "contract": {"enforced": False, "alias_types": True}, "access": "protected", "event_time": None, + "partition_lookback": 0, + "partition_grain": None, }, "original_file_path": normalize("models/metricflow_time_spine_second.sql"), "unique_id": "model.test.metricflow_time_spine_second", @@ -380,6 +394,8 @@ def expect_model_output(self): "contract": {"enforced": False, "alias_types": True}, "access": "protected", "event_time": None, + "partition_lookback": 0, + "partition_grain": None, }, "original_file_path": normalize("models/outer.sql"), "unique_id": "model.test.outer", @@ -499,6 +515,8 @@ def expect_seed_output(self): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "event_time": None, + "partition_lookback": 0, + "partition_grain": None, }, "depends_on": {"macros": []}, "unique_id": "seed.test.seed", diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index bb36a5f08b9..aa7ffc56c66 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -4,17 +4,17 @@ from dbt.tests.util import relation_from_name, run_dbt input_model_sql = """ -{{ config(event_time='event_time') }} +{{ config(materialized='table', event_time='event_time') }} -select 1 as id, DATE '2020-01-01' as event_time +select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time union all -select 2 as id, DATE '2020-01-02' as event_time +select 2 as id, TIMESTAMP '2020-01-02 00:00:00-0' as event_time union all -select 3 as id, DATE '2020-01-03' as event_time +select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time """ microbatch_model_sql = """ -{{ config(materialized='incremental', event_time='event_time', partition_grain='day') }} +{{ config(materialized='incremental', incremental_strategy='merge', unique_key='id', event_time='event_time', partition_grain='day') }} select * from {{ ref('input_model') }} """ @@ -63,8 +63,6 @@ def test_run_with_event_time(self, project): ) self.assert_row_count(project, "microbatch_model", 1) - # results = run_dbt(["test", "--select", "microbatch_model", "--event-time-start", "2020-05-01", "--event-time-end", "2020-05-03"]) - class TestMicroBatchBoundsDefault: @pytest.fixture(scope="class") @@ -85,14 +83,36 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int) assert result[0] == expected_row_count def test_run_with_event_time(self, project): - # initial run + # initial run -- backfills all data with freeze_time("2020-01-01 13:57:00"): run_dbt(["run"]) - self.assert_row_count(project, "microbatch_model", 1) + self.assert_row_count(project, "microbatch_model", 3) # our partition grain is "day" so running the same day without new data should produce the same results with freeze_time("2020-01-03 14:57:00"): run_dbt(["run"]) self.assert_row_count(project, "microbatch_model", 3) - # results = run_dbt(["test", "--select", "microbatch_model", "--event-time-start", "2020-05-01", "--event-time-end", "2020-05-03"]) + # add next two days of data + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.run_sql( + f"insert into {test_schema_relation}.input_model(id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00-0'), (5, TIMESTAMP '2020-01-05 00:00:00-0')" + ) + self.assert_row_count(project, "input_model", 5) + + # re-run without changing current time => no insert + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 3) + + # re-run by advancing time by one day changing current time => insert 1 row + with freeze_time("2020-01-04 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 4) + + # re-run by advancing time by one more day changing current time => insert 1 more row + with freeze_time("2020-01-05 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 5) diff --git a/tests/unit/contracts/graph/test_nodes.py b/tests/unit/contracts/graph/test_nodes.py index a498b99dcbc..bf6f7f61100 100644 --- a/tests/unit/contracts/graph/test_nodes.py +++ b/tests/unit/contracts/graph/test_nodes.py @@ -183,6 +183,7 @@ def basic_compiled_dict(): "contract": {"enforced": False, "alias_types": True}, "docs": {"show": True}, "access": "protected", + "partition_lookback": 0, }, "docs": {"show": True}, "columns": {}, diff --git a/tests/unit/contracts/graph/test_nodes_parsed.py b/tests/unit/contracts/graph/test_nodes_parsed.py index ebbe2443771..dae32dc5e1c 100644 --- a/tests/unit/contracts/graph/test_nodes_parsed.py +++ b/tests/unit/contracts/graph/test_nodes_parsed.py @@ -100,6 +100,7 @@ def populated_node_config_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "access": "protected", + "partition_lookback": 0, } @@ -187,6 +188,7 @@ def base_parsed_model_dict(): "contract": {"enforced": False, "alias_types": True}, "packages": [], "access": "protected", + "partition_lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, @@ -297,6 +299,7 @@ def complex_parsed_model_dict(): "contract": {"enforced": False, "alias_types": True}, "packages": [], "access": "protected", + "partition_lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, @@ -520,6 +523,7 @@ def basic_parsed_seed_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], + "partition_lookback": 0, }, "docs": {"show": True}, "columns": {}, @@ -611,6 +615,7 @@ def complex_parsed_seed_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], + "partition_lookback": 0, }, "docs": {"show": True}, "columns": { @@ -818,6 +823,7 @@ def base_parsed_hook_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], + "partition_lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, @@ -899,6 +905,7 @@ def complex_parsed_hook_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], + "partition_lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, @@ -1253,6 +1260,7 @@ def basic_timestamp_snapshot_config_dict(): "packages": [], "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, + "partition_lookback": 0, } @@ -1291,6 +1299,7 @@ def complex_timestamp_snapshot_config_dict(): "packages": [], "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, + "partition_lookback": 0, } @@ -1357,6 +1366,7 @@ def basic_check_snapshot_config_dict(): "packages": [], "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, + "partition_lookback": 0, } @@ -1395,6 +1405,7 @@ def complex_set_snapshot_config_dict(): "packages": [], "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, + "partition_lookback": 0, } @@ -1519,6 +1530,7 @@ def basic_timestamp_snapshot_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], + "partition_lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, @@ -1621,6 +1633,7 @@ def basic_check_snapshot_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], + "partition_lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, From 25c10f75b8a9d95d40434a7fe66ce1521d9dc338 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 3 Sep 2024 19:39:29 -0400 Subject: [PATCH 06/38] add event_time_start params to build --- core/dbt/cli/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index 41bd44f2682..ea3846d7f7c 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -165,6 +165,8 @@ def cli(ctx, **kwargs): @click.pass_context @global_flags @p.empty +@p.event_time_start +@p.event_time_end @p.exclude @p.export_saved_queries @p.full_refresh @@ -792,8 +794,6 @@ def freshness(ctx, **kwargs): @p.store_failures @p.target_path @p.threads -# @p.event_time_start -# @p.event_time_end @p.vars @requires.postflight @requires.preflight From 699179f6c0b1a5d8810c6af115db11ca1313130f Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 3 Sep 2024 19:41:16 -0400 Subject: [PATCH 07/38] rename configs --- core/dbt/artifacts/resources/v1/config.py | 4 +-- core/dbt/context/providers.py | 4 +-- .../functional/artifacts/expected_manifest.py | 12 +++---- tests/functional/list/test_list.py | 36 +++++++++---------- .../functional/microbatch/test_microbatch.py | 2 +- tests/unit/contracts/graph/test_nodes.py | 2 +- .../unit/contracts/graph/test_nodes_parsed.py | 26 +++++++------- 7 files changed, 43 insertions(+), 43 deletions(-) diff --git a/core/dbt/artifacts/resources/v1/config.py b/core/dbt/artifacts/resources/v1/config.py index 9ee848a88a5..45fc8ebeb3f 100644 --- a/core/dbt/artifacts/resources/v1/config.py +++ b/core/dbt/artifacts/resources/v1/config.py @@ -80,8 +80,8 @@ class NodeConfig(NodeAndTestConfig): # 'mergebehavior' dictionary materialized: str = "view" incremental_strategy: Optional[str] = None - partition_grain: Optional[PartitionGrain] = None - partition_lookback: int = 0 + batch_size: Optional[PartitionGrain] = None + lookback: int = 0 persist_docs: Dict[str, Any] = field(default_factory=dict) post_hook: List[Hook] = field( default_factory=list, diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index c1ba52982e7..d657b510192 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -247,12 +247,12 @@ def _build_start_time( return None assert isinstance(self.model.config, NodeConfig) - grain = self.model.config.partition_grain + grain = self.model.config.batch_size if grain is None: # TODO: Better error message raise DbtRuntimeError("Partition grain not specified") - lookback = self.model.config.partition_lookback + lookback = self.model.config.lookback if grain == PartitionGrain.hour: start = datetime( checkpoint.year, diff --git a/tests/functional/artifacts/expected_manifest.py b/tests/functional/artifacts/expected_manifest.py index 7844d17cf02..6de0001430f 100644 --- a/tests/functional/artifacts/expected_manifest.py +++ b/tests/functional/artifacts/expected_manifest.py @@ -40,8 +40,8 @@ def get_rendered_model_config(**updates): "contract": {"enforced": False, "alias_types": True}, "access": "protected", "event_time": None, - "partition_lookback": 0, - "partition_grain": None, + "lookback": 0, + "batch_size": None, } result.update(updates) return result @@ -78,8 +78,8 @@ def get_rendered_seed_config(**updates): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "event_time": None, - "partition_lookback": 0, - "partition_grain": None, + "lookback": 0, + "batch_size": None, } result.update(updates) return result @@ -121,8 +121,8 @@ def get_rendered_snapshot_config(**updates): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "event_time": None, - "partition_lookback": 0, - "partition_grain": None, + "lookback": 0, + "batch_size": None, } result.update(updates) return result diff --git a/tests/functional/list/test_list.py b/tests/functional/list/test_list.py index 341f20ddded..fdd838891ab 100644 --- a/tests/functional/list/test_list.py +++ b/tests/functional/list/test_list.py @@ -80,8 +80,8 @@ def expect_snapshot_output(self, happy_path_project): # noqa: F811 "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "event_time": None, - "partition_lookback": 0, - "partition_grain": None, + "lookback": 0, + "batch_size": None, }, "unique_id": "snapshot.test.my_snapshot", "original_file_path": normalize("snapshots/snapshot.sql"), @@ -125,8 +125,8 @@ def expect_analyses_output(self): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "event_time": None, - "partition_lookback": 0, - "partition_grain": None, + "lookback": 0, + "batch_size": None, }, "unique_id": "analysis.test.a", "original_file_path": normalize("analyses/a.sql"), @@ -189,8 +189,8 @@ def expect_model_output(self): "contract": {"enforced": False, "alias_types": True}, "access": "protected", "event_time": None, - "partition_lookback": 0, - "partition_grain": None, + "lookback": 0, + "batch_size": None, }, "original_file_path": normalize("models/ephemeral.sql"), "unique_id": "model.test.ephemeral", @@ -230,8 +230,8 @@ def expect_model_output(self): "contract": {"enforced": False, "alias_types": True}, "access": "protected", "event_time": None, - "partition_lookback": 0, - "partition_grain": None, + "lookback": 0, + "batch_size": None, }, "original_file_path": normalize("models/incremental.sql"), "unique_id": "model.test.incremental", @@ -271,8 +271,8 @@ def expect_model_output(self): "contract": {"enforced": False, "alias_types": True}, "access": "protected", "event_time": None, - "partition_lookback": 0, - "partition_grain": None, + "lookback": 0, + "batch_size": None, }, "original_file_path": normalize("models/sub/inner.sql"), "unique_id": "model.test.inner", @@ -312,8 +312,8 @@ def expect_model_output(self): "contract": {"enforced": False, "alias_types": True}, "access": "protected", "event_time": None, - "partition_lookback": 0, - "partition_grain": None, + "lookback": 0, + "batch_size": None, }, "original_file_path": normalize("models/metricflow_time_spine.sql"), "unique_id": "model.test.metricflow_time_spine", @@ -353,8 +353,8 @@ def expect_model_output(self): "contract": {"enforced": False, "alias_types": True}, "access": "protected", "event_time": None, - "partition_lookback": 0, - "partition_grain": None, + "lookback": 0, + "batch_size": None, }, "original_file_path": normalize("models/metricflow_time_spine_second.sql"), "unique_id": "model.test.metricflow_time_spine_second", @@ -394,8 +394,8 @@ def expect_model_output(self): "contract": {"enforced": False, "alias_types": True}, "access": "protected", "event_time": None, - "partition_lookback": 0, - "partition_grain": None, + "lookback": 0, + "batch_size": None, }, "original_file_path": normalize("models/outer.sql"), "unique_id": "model.test.outer", @@ -515,8 +515,8 @@ def expect_seed_output(self): "docs": {"node_color": None, "show": True}, "contract": {"enforced": False, "alias_types": True}, "event_time": None, - "partition_lookback": 0, - "partition_grain": None, + "lookback": 0, + "batch_size": None, }, "depends_on": {"macros": []}, "unique_id": "seed.test.seed", diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index aa7ffc56c66..f4c341decde 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -14,7 +14,7 @@ """ microbatch_model_sql = """ -{{ config(materialized='incremental', incremental_strategy='merge', unique_key='id', event_time='event_time', partition_grain='day') }} +{{ config(materialized='incremental', incremental_strategy='merge', unique_key='id', event_time='event_time', batch_size='day') }} select * from {{ ref('input_model') }} """ diff --git a/tests/unit/contracts/graph/test_nodes.py b/tests/unit/contracts/graph/test_nodes.py index bf6f7f61100..a67ca1f5efc 100644 --- a/tests/unit/contracts/graph/test_nodes.py +++ b/tests/unit/contracts/graph/test_nodes.py @@ -183,7 +183,7 @@ def basic_compiled_dict(): "contract": {"enforced": False, "alias_types": True}, "docs": {"show": True}, "access": "protected", - "partition_lookback": 0, + "lookback": 0, }, "docs": {"show": True}, "columns": {}, diff --git a/tests/unit/contracts/graph/test_nodes_parsed.py b/tests/unit/contracts/graph/test_nodes_parsed.py index dae32dc5e1c..7655b7aa444 100644 --- a/tests/unit/contracts/graph/test_nodes_parsed.py +++ b/tests/unit/contracts/graph/test_nodes_parsed.py @@ -100,7 +100,7 @@ def populated_node_config_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "access": "protected", - "partition_lookback": 0, + "lookback": 0, } @@ -188,7 +188,7 @@ def base_parsed_model_dict(): "contract": {"enforced": False, "alias_types": True}, "packages": [], "access": "protected", - "partition_lookback": 0, + "lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, @@ -299,7 +299,7 @@ def complex_parsed_model_dict(): "contract": {"enforced": False, "alias_types": True}, "packages": [], "access": "protected", - "partition_lookback": 0, + "lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, @@ -523,7 +523,7 @@ def basic_parsed_seed_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], - "partition_lookback": 0, + "lookback": 0, }, "docs": {"show": True}, "columns": {}, @@ -615,7 +615,7 @@ def complex_parsed_seed_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], - "partition_lookback": 0, + "lookback": 0, }, "docs": {"show": True}, "columns": { @@ -823,7 +823,7 @@ def base_parsed_hook_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], - "partition_lookback": 0, + "lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, @@ -905,7 +905,7 @@ def complex_parsed_hook_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], - "partition_lookback": 0, + "lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, @@ -1260,7 +1260,7 @@ def basic_timestamp_snapshot_config_dict(): "packages": [], "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, - "partition_lookback": 0, + "lookback": 0, } @@ -1299,7 +1299,7 @@ def complex_timestamp_snapshot_config_dict(): "packages": [], "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, - "partition_lookback": 0, + "lookback": 0, } @@ -1366,7 +1366,7 @@ def basic_check_snapshot_config_dict(): "packages": [], "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, - "partition_lookback": 0, + "lookback": 0, } @@ -1405,7 +1405,7 @@ def complex_set_snapshot_config_dict(): "packages": [], "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, - "partition_lookback": 0, + "lookback": 0, } @@ -1530,7 +1530,7 @@ def basic_timestamp_snapshot_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], - "partition_lookback": 0, + "lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, @@ -1633,7 +1633,7 @@ def basic_check_snapshot_dict(): "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, "packages": [], - "partition_lookback": 0, + "lookback": 0, }, "docs": {"show": True}, "contract": {"enforced": False, "alias_types": True}, From 2d19d1c34346e7f1e18a6ba966beff6199ef9923 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Thu, 5 Sep 2024 10:02:10 -0500 Subject: [PATCH 08/38] Gate resolve_event_time_filter via micro batch strategy and fix strptime usage --- core/dbt/context/providers.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index d657b510192..a8d4c375082 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -301,20 +301,20 @@ def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeF isinstance(target.config, NodeConfig) and target.config.event_time and self.model.config.materialized == "incremental" - # and self.model.config.incremental_strategy == "microbatch" + and self.model.config.incremental_strategy == "microbatch" ): is_incremental = self._is_incremental() print(f"####------#### is_incremental: {is_incremental}") end = getattr(self.config.args, "EVENT_TIME_END", None) end = ( - datetime.strptime(end, "%Y-%m-%d") + datetime.strptime(end, "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.UTC) if end else self._build_end_time(is_incremental=is_incremental) ) start = getattr(self.config.args, "EVENT_TIME_START", None) start = ( - datetime.strptime(start, "%Y-%m-%d") + datetime.strptime(start, "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.UTC) if start else self._build_start_time(checkpoint=end, is_incremental=is_incremental) ) From 57b1353daf8ed66c320332418df96f9fb3e06a5b Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Thu, 5 Sep 2024 10:03:26 -0500 Subject: [PATCH 09/38] Add unit test for resolve_event_time_filter --- tests/unit/context/test_providers.py | 136 ++++++++++++++++++++++++++- 1 file changed, 135 insertions(+), 1 deletion(-) diff --git a/tests/unit/context/test_providers.py b/tests/unit/context/test_providers.py index 224675143e4..a535dff2591 100644 --- a/tests/unit/context/test_providers.py +++ b/tests/unit/context/test_providers.py @@ -1,11 +1,18 @@ +from datetime import datetime +from typing import Optional from unittest import mock import pytest +import pytz +from freezegun import freeze_time +from pytest_mock import MockerFixture from dbt.adapters.base import BaseRelation -from dbt.artifacts.resources import Quoting +from dbt.artifacts.resources import NodeConfig, Quoting +from dbt.artifacts.resources.types import PartitionGrain from dbt.context.providers import ( BaseResolver, + EventTimeFilter, RuntimeRefResolver, RuntimeSourceResolver, ) @@ -34,6 +41,133 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): assert resolver.resolve_limit == expected_resolve_limit + @freeze_time("2024-09-05 08:56:00") + @pytest.mark.parametrize( + "is_incremental,materialized,incremental_strategy,event_time_end,event_time_start,expected_filter", + [ + (True, "table", "microbatch", None, None, None), + (True, "incremental", "merge", None, None, None), + ( + True, + "incremental", + "microbatch", + None, + None, + EventTimeFilter( + field_name="created_at", + end=datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC), + start=datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC), + ), + ), + ( + True, + "incremental", + "microbatch", + "2024-08-01 08:11:00", + None, + EventTimeFilter( + field_name="created_at", + end=datetime(2024, 8, 1, 8, 11, 0, 0, pytz.UTC), + start=datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), + ), + ), + ( + True, + "incremental", + "microbatch", + None, + "2024-08-01 00:00:00", + EventTimeFilter( + field_name="created_at", + end=datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC), + start=datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), + ), + ), + ( + True, + "incremental", + "microbatch", + "2024-09-01 00:00:00", + "2024-08-01 00:00:00", + EventTimeFilter( + field_name="created_at", + end=datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC), + start=datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), + ), + ), + (False, "incremental", "microbatch", None, None, None), + ( + False, + "incremental", + "microbatch", + "2024-08-01 08:11:00", + None, + EventTimeFilter( + field_name="created_at", + end=datetime(2024, 8, 1, 8, 11, 0, 0, pytz.UTC), + start=None, + ), + ), + ( + False, + "incremental", + "microbatch", + None, + "2024-08-01 00:00:00", + EventTimeFilter( + field_name="created_at", + end=None, + start=datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), + ), + ), + ( + False, + "incremental", + "microbatch", + "2024-09-01 00:00:00", + "2024-08-01 00:00:00", + EventTimeFilter( + field_name="created_at", + end=datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC), + start=datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), + ), + ), + ], + ) + def test_resolve_event_time_filter( + self, + mocker: MockerFixture, + resolver: ResolverSubclass, + is_incremental: bool, + materialized: str, + incremental_strategy: str, + event_time_end: Optional[str], + event_time_start: Optional[str], + expected_filter: Optional[EventTimeFilter], + ) -> None: + mocker.patch("dbt.context.providers.BaseResolver._is_incremental").return_value = ( + is_incremental + ) + target = mock.Mock() + target.config = mock.MagicMock(NodeConfig) + target.config.event_time = "created_at" + resolver.model.config = mock.MagicMock(NodeConfig) + resolver.model.config.materialized = materialized + resolver.model.config.incremental_strategy = incremental_strategy + resolver.model.config.batch_size = PartitionGrain.day + resolver.model.config.lookback = 0 + resolver.config.args.EVENT_TIME_END = event_time_end + resolver.config.args.EVENT_TIME_START = event_time_start + event_time_filter = resolver.resolve_event_time_filter(target=target) + + if expected_filter is not None: + assert event_time_filter is not None + assert event_time_filter.field_name == expected_filter.field_name + assert event_time_filter.end == expected_filter.end + assert event_time_filter.start == expected_filter.start + else: + assert event_time_filter is None + class TestRuntimeRefResolver: @pytest.fixture From e0bae27f9dee3c4d086f8f4ed50ce7428e1f87d5 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Thu, 5 Sep 2024 11:40:36 -0700 Subject: [PATCH 10/38] Additional unit tests for `resolve_event_time_filter` to ensure lookback + batch_size work --- tests/unit/context/test_providers.py | 84 ++++++++++++++++++++++++++-- 1 file changed, 78 insertions(+), 6 deletions(-) diff --git a/tests/unit/context/test_providers.py b/tests/unit/context/test_providers.py index a535dff2591..f835a93751e 100644 --- a/tests/unit/context/test_providers.py +++ b/tests/unit/context/test_providers.py @@ -43,16 +43,18 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): @freeze_time("2024-09-05 08:56:00") @pytest.mark.parametrize( - "is_incremental,materialized,incremental_strategy,event_time_end,event_time_start,expected_filter", + "is_incremental,materialized,incremental_strategy,event_time_end,event_time_start,batch_size,lookback,expected_filter", [ - (True, "table", "microbatch", None, None, None), - (True, "incremental", "merge", None, None, None), + (True, "table", "microbatch", None, None, PartitionGrain.day, 0, None), + (True, "incremental", "merge", None, None, PartitionGrain.day, 0, None), ( True, "incremental", "microbatch", None, None, + PartitionGrain.day, + 0, EventTimeFilter( field_name="created_at", end=datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC), @@ -65,6 +67,8 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): "microbatch", "2024-08-01 08:11:00", None, + PartitionGrain.day, + 0, EventTimeFilter( field_name="created_at", end=datetime(2024, 8, 1, 8, 11, 0, 0, pytz.UTC), @@ -77,6 +81,8 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): "microbatch", None, "2024-08-01 00:00:00", + PartitionGrain.day, + 0, EventTimeFilter( field_name="created_at", end=datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC), @@ -89,19 +95,23 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): "microbatch", "2024-09-01 00:00:00", "2024-08-01 00:00:00", + PartitionGrain.day, + 0, EventTimeFilter( field_name="created_at", end=datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC), start=datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), ), ), - (False, "incremental", "microbatch", None, None, None), + (False, "incremental", "microbatch", None, None, PartitionGrain.day, 0, None), ( False, "incremental", "microbatch", "2024-08-01 08:11:00", None, + PartitionGrain.day, + 0, EventTimeFilter( field_name="created_at", end=datetime(2024, 8, 1, 8, 11, 0, 0, pytz.UTC), @@ -114,6 +124,8 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): "microbatch", None, "2024-08-01 00:00:00", + PartitionGrain.day, + 0, EventTimeFilter( field_name="created_at", end=None, @@ -126,12 +138,70 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): "microbatch", "2024-09-01 00:00:00", "2024-08-01 00:00:00", + PartitionGrain.day, + 0, EventTimeFilter( field_name="created_at", end=datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC), start=datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), ), ), + ( + True, + "incremental", + "microbatch", + "2024-09-01 00:49:00", + None, + PartitionGrain.hour, + 1, + EventTimeFilter( + field_name="created_at", + end=datetime(2024, 9, 1, 0, 49, 0, 0, pytz.UTC), + start=datetime(2024, 8, 31, 23, 0, 0, 0, pytz.UTC), + ), + ), + ( + True, + "incremental", + "microbatch", + "2024-09-01 13:31:00", + None, + PartitionGrain.day, + 1, + EventTimeFilter( + field_name="created_at", + end=datetime(2024, 9, 1, 13, 31, 0, 0, pytz.UTC), + start=datetime(2024, 8, 31, 0, 0, 0, 0, pytz.UTC), + ), + ), + ( + True, + "incremental", + "microbatch", + "2024-01-23 12:30:00", + None, + PartitionGrain.month, + 1, + EventTimeFilter( + field_name="created_at", + end=datetime(2024, 1, 23, 12, 30, 0, 0, pytz.UTC), + start=datetime(2023, 12, 1, 0, 0, 0, 0, pytz.utc), + ), + ), + ( + True, + "incremental", + "microbatch", + "2024-01-23 12:30:00", + None, + PartitionGrain.year, + 1, + EventTimeFilter( + field_name="created_at", + end=datetime(2024, 1, 23, 12, 30, 0, 0, pytz.UTC), + start=datetime(2023, 1, 1, 0, 0, 0, 0, pytz.utc), + ), + ), ], ) def test_resolve_event_time_filter( @@ -143,6 +213,8 @@ def test_resolve_event_time_filter( incremental_strategy: str, event_time_end: Optional[str], event_time_start: Optional[str], + batch_size: PartitionGrain, + lookback: int, expected_filter: Optional[EventTimeFilter], ) -> None: mocker.patch("dbt.context.providers.BaseResolver._is_incremental").return_value = ( @@ -154,8 +226,8 @@ def test_resolve_event_time_filter( resolver.model.config = mock.MagicMock(NodeConfig) resolver.model.config.materialized = materialized resolver.model.config.incremental_strategy = incremental_strategy - resolver.model.config.batch_size = PartitionGrain.day - resolver.model.config.lookback = 0 + resolver.model.config.batch_size = batch_size + resolver.model.config.lookback = lookback resolver.config.args.EVENT_TIME_END = event_time_end resolver.config.args.EVENT_TIME_START = event_time_start event_time_filter = resolver.resolve_event_time_filter(target=target) From 1313aff9d166e18ba2a7860fa959e629fd09996c Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Thu, 5 Sep 2024 11:43:59 -0700 Subject: [PATCH 11/38] Remove extraneous comments and print statements from resolve_event_time_filter --- core/dbt/context/providers.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index a8d4c375082..29dd86ea3c0 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -304,7 +304,6 @@ def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeF and self.model.config.incremental_strategy == "microbatch" ): is_incremental = self._is_incremental() - print(f"####------#### is_incremental: {is_incremental}") end = getattr(self.config.args, "EVENT_TIME_END", None) end = ( datetime.strptime(end, "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.UTC) @@ -326,12 +325,6 @@ def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeF end=end, ) - # Microbatch debugging - # if event_time_filter is not None: - # print(event_time_filter.render()) - # else: - # print("No filter present! Oh no!") - return event_time_filter @abc.abstractmethod From 7307c028388c06e1ce1d8cefd72f7d12e76d4a72 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Thu, 5 Sep 2024 13:18:06 -0700 Subject: [PATCH 12/38] Fixup microbatch functional tests to use microbatch strategy --- tests/functional/microbatch/test_microbatch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index f4c341decde..df32e4a00ac 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -14,7 +14,7 @@ """ microbatch_model_sql = """ -{{ config(materialized='incremental', incremental_strategy='merge', unique_key='id', event_time='event_time', batch_size='day') }} +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} select * from {{ ref('input_model') }} """ From 838a0aae7a0a8a8a6524a009c73f05f9a5ab520b Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Thu, 5 Sep 2024 15:20:31 -0700 Subject: [PATCH 13/38] Gate microbatch functionality behind env_var while in beta --- core/dbt/context/providers.py | 3 ++- tests/functional/microbatch/test_microbatch.py | 5 +++++ tests/unit/context/test_providers.py | 2 ++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index 29dd86ea3c0..24663cd5974 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -298,7 +298,8 @@ def _is_incremental(self) -> bool: def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]: event_time_filter = None if ( - isinstance(target.config, NodeConfig) + os.environ.get("DBT_EXPERIMENTAL_MICROBATCH") + and isinstance(target.config, NodeConfig) and target.config.event_time and self.model.config.materialized == "incremental" and self.model.config.incremental_strategy == "microbatch" diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index df32e4a00ac..0d65702b44a 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -1,3 +1,6 @@ +import os +from unittest import mock + import pytest from freezegun import freeze_time @@ -37,6 +40,7 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int) assert result[0] == expected_row_count + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run without --event-time-start or --event-time-end - 3 expected rows in output run_dbt(["run"]) @@ -82,6 +86,7 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int) assert result[0] == expected_row_count + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # initial run -- backfills all data with freeze_time("2020-01-01 13:57:00"): diff --git a/tests/unit/context/test_providers.py b/tests/unit/context/test_providers.py index f835a93751e..09dd8fff901 100644 --- a/tests/unit/context/test_providers.py +++ b/tests/unit/context/test_providers.py @@ -1,3 +1,4 @@ +import os from datetime import datetime from typing import Optional from unittest import mock @@ -42,6 +43,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): assert resolver.resolve_limit == expected_resolve_limit @freeze_time("2024-09-05 08:56:00") + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) @pytest.mark.parametrize( "is_incremental,materialized,incremental_strategy,event_time_end,event_time_start,batch_size,lookback,expected_filter", [ From 43715de002c56663c2f790a5cc1fd7eaa4fe492b Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Thu, 5 Sep 2024 18:09:04 -0700 Subject: [PATCH 14/38] Add comment about how _is_incremental should be removed --- core/dbt/context/providers.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index 24663cd5974..7e96277b55d 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -282,6 +282,8 @@ def _build_start_time( return start def _is_incremental(self) -> bool: + # TODO: Remove. This is a temporary method. We're working with adapters on + # a strategy to ensure we can access the `is_incremental` logic without drift relation_info = self.Relation.create_from(self.config, self.model) relation = self.db_wrapper.get_relation( relation_info.database, relation_info.schema, relation_info.name From e38ff471d72bfe78fa4b8bc8c8574a07bcde2e29 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Thu, 5 Sep 2024 18:24:38 -0700 Subject: [PATCH 15/38] Improve `event_time_start/end` cli parameters to auto convert to datetime objects --- core/dbt/cli/params.py | 4 ++-- core/dbt/context/providers.py | 8 ++++---- tests/unit/context/test_providers.py | 24 ++++++++++++------------ 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/core/dbt/cli/params.py b/core/dbt/cli/params.py index 7674798475d..5c2ee81691b 100644 --- a/core/dbt/cli/params.py +++ b/core/dbt/cli/params.py @@ -96,7 +96,7 @@ envvar="DBT_EVENT_TIME_END", # TODO: improve help text help="upper bound to filter refs on", - type=click.STRING, + type=click.DateTime(), default=None, ) @@ -105,7 +105,7 @@ envvar="DBT_EVENT_TIME_START", # TODO: improve help text help="lower bound to filter refs on", - type=click.STRING, + type=click.DateTime(), default=None, ) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index 7e96277b55d..b6a77cd5359 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -307,16 +307,16 @@ def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeF and self.model.config.incremental_strategy == "microbatch" ): is_incremental = self._is_incremental() - end = getattr(self.config.args, "EVENT_TIME_END", None) + end: Optional[datetime] = getattr(self.config.args, "EVENT_TIME_END", None) end = ( - datetime.strptime(end, "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.UTC) + end.replace(tzinfo=pytz.UTC) if end else self._build_end_time(is_incremental=is_incremental) ) - start = getattr(self.config.args, "EVENT_TIME_START", None) + start: Optional[datetime] = getattr(self.config.args, "EVENT_TIME_START", None) start = ( - datetime.strptime(start, "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.UTC) + start.replace(tzinfo=pytz.UTC) if start else self._build_start_time(checkpoint=end, is_incremental=is_incremental) ) diff --git a/tests/unit/context/test_providers.py b/tests/unit/context/test_providers.py index 09dd8fff901..60fff46d00a 100644 --- a/tests/unit/context/test_providers.py +++ b/tests/unit/context/test_providers.py @@ -67,7 +67,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): True, "incremental", "microbatch", - "2024-08-01 08:11:00", + datetime(2024, 8, 1, 8, 11, 0), None, PartitionGrain.day, 0, @@ -82,7 +82,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): "incremental", "microbatch", None, - "2024-08-01 00:00:00", + datetime(2024, 8, 1), PartitionGrain.day, 0, EventTimeFilter( @@ -95,8 +95,8 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): True, "incremental", "microbatch", - "2024-09-01 00:00:00", - "2024-08-01 00:00:00", + datetime(2024, 9, 1), + datetime(2024, 8, 1), PartitionGrain.day, 0, EventTimeFilter( @@ -110,7 +110,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): False, "incremental", "microbatch", - "2024-08-01 08:11:00", + datetime(2024, 8, 1, 8, 11, 0), None, PartitionGrain.day, 0, @@ -125,7 +125,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): "incremental", "microbatch", None, - "2024-08-01 00:00:00", + datetime(2024, 8, 1), PartitionGrain.day, 0, EventTimeFilter( @@ -138,8 +138,8 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): False, "incremental", "microbatch", - "2024-09-01 00:00:00", - "2024-08-01 00:00:00", + datetime(2024, 9, 1), + datetime(2024, 8, 1), PartitionGrain.day, 0, EventTimeFilter( @@ -152,7 +152,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): True, "incremental", "microbatch", - "2024-09-01 00:49:00", + datetime(2024, 9, 1, 0, 49, 0), None, PartitionGrain.hour, 1, @@ -166,7 +166,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): True, "incremental", "microbatch", - "2024-09-01 13:31:00", + datetime(2024, 9, 1, 13, 31, 0), None, PartitionGrain.day, 1, @@ -180,7 +180,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): True, "incremental", "microbatch", - "2024-01-23 12:30:00", + datetime(2024, 1, 23, 12, 30, 0), None, PartitionGrain.month, 1, @@ -194,7 +194,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): True, "incremental", "microbatch", - "2024-01-23 12:30:00", + datetime(2024, 1, 23, 12, 30, 0), None, PartitionGrain.year, 1, From 457698ccd8e3fc69bda9577b027be49ad6bff90b Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Fri, 6 Sep 2024 10:05:45 -0400 Subject: [PATCH 16/38] for testing: dbt-postgres 'microbatch' strategy --- dev-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index c4746851761..019143f67ea 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,7 +1,7 @@ git+https://github.com/dbt-labs/dbt-adapters.git@event-time-ref-filtering git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git@main -git+https://github.com/dbt-labs/dbt-postgres.git@main +git+https://github.com/dbt-labs/dbt-postgres.git@poc-microbatch-merge # black must match what's in .pre-commit-config.yaml to be sure local env matches CI black==24.3.0 bumpversion From 3a4fa7fe82bbce3e1438ea509835f58045d13224 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Mon, 9 Sep 2024 10:20:33 -0500 Subject: [PATCH 17/38] Make event_time model configs `Any` so as to not break people already using those names --- core/dbt/artifacts/resources/v1/config.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/dbt/artifacts/resources/v1/config.py b/core/dbt/artifacts/resources/v1/config.py index 45fc8ebeb3f..e6cd26ec823 100644 --- a/core/dbt/artifacts/resources/v1/config.py +++ b/core/dbt/artifacts/resources/v1/config.py @@ -7,7 +7,7 @@ from dbt import hooks from dbt.artifacts.resources.base import Docs -from dbt.artifacts.resources.types import ModelHookType, PartitionGrain +from dbt.artifacts.resources.types import ModelHookType from dbt.artifacts.utils.validation import validate_color from dbt_common.contracts.config.base import BaseConfig, CompareBehavior, MergeBehavior from dbt_common.contracts.config.materialization import OnConfigurationChangeOption @@ -80,8 +80,8 @@ class NodeConfig(NodeAndTestConfig): # 'mergebehavior' dictionary materialized: str = "view" incremental_strategy: Optional[str] = None - batch_size: Optional[PartitionGrain] = None - lookback: int = 0 + batch_size: Any = None + lookback: Any = 0 persist_docs: Dict[str, Any] = field(default_factory=dict) post_hook: List[Hook] = field( default_factory=list, @@ -124,8 +124,7 @@ class NodeConfig(NodeAndTestConfig): default_factory=ContractConfig, metadata=MergeBehavior.Update.meta(), ) - # TODO: does this need to be Any? - event_time: Optional[str] = None + event_time: Any = None def __post_init__(self): # we validate that node_color has a suitable value to prevent dbt-docs from crashing From 8cc1c7dae722809b3dd78e69eb33b2d56dfaf54c Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Mon, 9 Sep 2024 12:30:40 -0500 Subject: [PATCH 18/38] Rename `PartitionGrain` to `BatchSize` --- core/dbt/artifacts/resources/types.py | 2 +- core/dbt/context/providers.py | 16 +++++++------- tests/unit/context/test_providers.py | 32 +++++++++++++-------------- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/core/dbt/artifacts/resources/types.py b/core/dbt/artifacts/resources/types.py index 737a123f136..bac25bd2e0e 100644 --- a/core/dbt/artifacts/resources/types.py +++ b/core/dbt/artifacts/resources/types.py @@ -70,7 +70,7 @@ def plural(self) -> str: return str(self) + "s" -class PartitionGrain(StrEnum): +class BatchSize(StrEnum): hour = "hour" day = "day" month = "month" diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index b6a77cd5359..e61cc3eab78 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -31,7 +31,7 @@ get_adapter_type_names, ) from dbt.artifacts.resources import NodeConfig, NodeVersion, RefArgs -from dbt.artifacts.resources.types import PartitionGrain +from dbt.artifacts.resources.types import BatchSize from dbt.clients.jinja import ( MacroGenerator, MacroStack, @@ -247,13 +247,13 @@ def _build_start_time( return None assert isinstance(self.model.config, NodeConfig) - grain = self.model.config.batch_size - if grain is None: + batch_size = self.model.config.batch_size + if batch_size is None: # TODO: Better error message - raise DbtRuntimeError("Partition grain not specified") + raise DbtRuntimeError("Partition batch_size not specified") lookback = self.model.config.lookback - if grain == PartitionGrain.hour: + if batch_size == BatchSize.hour: start = datetime( checkpoint.year, checkpoint.month, @@ -264,16 +264,16 @@ def _build_start_time( 0, pytz.utc, ) - timedelta(hours=lookback) - elif grain == PartitionGrain.day: + elif batch_size == BatchSize.day: start = datetime( checkpoint.year, checkpoint.month, checkpoint.day, 0, 0, 0, 0, pytz.utc ) - timedelta(days=lookback) - elif grain == PartitionGrain.month: + elif batch_size == BatchSize.month: start = datetime(checkpoint.year, checkpoint.month, 1, 0, 0, 0, 0, pytz.utc) for _ in range(lookback): start = start - timedelta(days=1) start = datetime(start.year, start.month, 1, 0, 0, 0, 0, pytz.utc) - elif grain == PartitionGrain.year: + elif batch_size == BatchSize.year: start = datetime(checkpoint.year - lookback, 1, 1, 0, 0, 0, 0, pytz.utc) else: # TODO: Better error message diff --git a/tests/unit/context/test_providers.py b/tests/unit/context/test_providers.py index 60fff46d00a..0a0acb4d550 100644 --- a/tests/unit/context/test_providers.py +++ b/tests/unit/context/test_providers.py @@ -10,7 +10,7 @@ from dbt.adapters.base import BaseRelation from dbt.artifacts.resources import NodeConfig, Quoting -from dbt.artifacts.resources.types import PartitionGrain +from dbt.artifacts.resources.types import BatchSize from dbt.context.providers import ( BaseResolver, EventTimeFilter, @@ -47,15 +47,15 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): @pytest.mark.parametrize( "is_incremental,materialized,incremental_strategy,event_time_end,event_time_start,batch_size,lookback,expected_filter", [ - (True, "table", "microbatch", None, None, PartitionGrain.day, 0, None), - (True, "incremental", "merge", None, None, PartitionGrain.day, 0, None), + (True, "table", "microbatch", None, None, BatchSize.day, 0, None), + (True, "incremental", "merge", None, None, BatchSize.day, 0, None), ( True, "incremental", "microbatch", None, None, - PartitionGrain.day, + BatchSize.day, 0, EventTimeFilter( field_name="created_at", @@ -69,7 +69,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): "microbatch", datetime(2024, 8, 1, 8, 11, 0), None, - PartitionGrain.day, + BatchSize.day, 0, EventTimeFilter( field_name="created_at", @@ -83,7 +83,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): "microbatch", None, datetime(2024, 8, 1), - PartitionGrain.day, + BatchSize.day, 0, EventTimeFilter( field_name="created_at", @@ -97,7 +97,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): "microbatch", datetime(2024, 9, 1), datetime(2024, 8, 1), - PartitionGrain.day, + BatchSize.day, 0, EventTimeFilter( field_name="created_at", @@ -105,14 +105,14 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): start=datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), ), ), - (False, "incremental", "microbatch", None, None, PartitionGrain.day, 0, None), + (False, "incremental", "microbatch", None, None, BatchSize.day, 0, None), ( False, "incremental", "microbatch", datetime(2024, 8, 1, 8, 11, 0), None, - PartitionGrain.day, + BatchSize.day, 0, EventTimeFilter( field_name="created_at", @@ -126,7 +126,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): "microbatch", None, datetime(2024, 8, 1), - PartitionGrain.day, + BatchSize.day, 0, EventTimeFilter( field_name="created_at", @@ -140,7 +140,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): "microbatch", datetime(2024, 9, 1), datetime(2024, 8, 1), - PartitionGrain.day, + BatchSize.day, 0, EventTimeFilter( field_name="created_at", @@ -154,7 +154,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): "microbatch", datetime(2024, 9, 1, 0, 49, 0), None, - PartitionGrain.hour, + BatchSize.hour, 1, EventTimeFilter( field_name="created_at", @@ -168,7 +168,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): "microbatch", datetime(2024, 9, 1, 13, 31, 0), None, - PartitionGrain.day, + BatchSize.day, 1, EventTimeFilter( field_name="created_at", @@ -182,7 +182,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): "microbatch", datetime(2024, 1, 23, 12, 30, 0), None, - PartitionGrain.month, + BatchSize.month, 1, EventTimeFilter( field_name="created_at", @@ -196,7 +196,7 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): "microbatch", datetime(2024, 1, 23, 12, 30, 0), None, - PartitionGrain.year, + BatchSize.year, 1, EventTimeFilter( field_name="created_at", @@ -215,7 +215,7 @@ def test_resolve_event_time_filter( incremental_strategy: str, event_time_end: Optional[str], event_time_start: Optional[str], - batch_size: PartitionGrain, + batch_size: BatchSize, lookback: int, expected_filter: Optional[EventTimeFilter], ) -> None: From 1b0a904d3db7154f3d4c94f6738f19e53bf178c8 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Mon, 9 Sep 2024 13:06:02 -0500 Subject: [PATCH 19/38] fix error message about batch size --- core/dbt/context/providers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index e61cc3eab78..063578b81c6 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -250,7 +250,7 @@ def _build_start_time( batch_size = self.model.config.batch_size if batch_size is None: # TODO: Better error message - raise DbtRuntimeError("Partition batch_size not specified") + raise DbtRuntimeError("Batch size not specified") lookback = self.model.config.lookback if batch_size == BatchSize.hour: From 7522b9bc44d2dd0655740dc3b7b80f4dcf03eb0a Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Mon, 9 Sep 2024 14:07:52 -0500 Subject: [PATCH 20/38] Create unit test to check values which gate off event_time usage --- tests/unit/context/test_providers.py | 79 +++++++++++++++++----------- 1 file changed, 49 insertions(+), 30 deletions(-) diff --git a/tests/unit/context/test_providers.py b/tests/unit/context/test_providers.py index 0a0acb4d550..27fe2b8c544 100644 --- a/tests/unit/context/test_providers.py +++ b/tests/unit/context/test_providers.py @@ -42,17 +42,58 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): assert resolver.resolve_limit == expected_resolve_limit + @pytest.mark.parametrize( + "dbt_experimental_microbatch,materialized,incremental_strategy,expect_filter", + [ + (True, "incremental", "microbatch", True), + (False, "incremental", "microbatch", False), + (True, "table", "microbatch", False), + (True, "incremental", "merge", False), + ], + ) + def test_resolve_event_time_filter_gating( + self, + mocker: MockerFixture, + resolver: ResolverSubclass, + dbt_experimental_microbatch: bool, + materialized: str, + incremental_strategy: str, + expect_filter: bool, + ) -> None: + if dbt_experimental_microbatch: + mocker.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + + mocker.patch("dbt.context.providers.BaseResolver._is_incremental").return_value = True + + # Target mocking + target = mock.Mock() + target.config = mock.MagicMock(NodeConfig) + target.config.event_time = "created_at" + + # Resolver mocking + resolver.config.args.EVENT_TIME_END = None + resolver.config.args.EVENT_TIME_START = None + resolver.model.config = mock.MagicMock(NodeConfig) + resolver.model.config.materialized = materialized + resolver.model.config.incremental_strategy = incremental_strategy + resolver.model.config.batch_size = BatchSize.day + resolver.model.config.lookback = 0 + + # Try to get an EventTimeFilter + event_time_filter = resolver.resolve_event_time_filter(target=target) + + if expect_filter: + assert isinstance(event_time_filter, EventTimeFilter) + else: + assert event_time_filter is None + @freeze_time("2024-09-05 08:56:00") @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) @pytest.mark.parametrize( - "is_incremental,materialized,incremental_strategy,event_time_end,event_time_start,batch_size,lookback,expected_filter", + "is_incremental,event_time_end,event_time_start,batch_size,lookback,expected_filter", [ - (True, "table", "microbatch", None, None, BatchSize.day, 0, None), - (True, "incremental", "merge", None, None, BatchSize.day, 0, None), ( True, - "incremental", - "microbatch", None, None, BatchSize.day, @@ -65,8 +106,6 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): ), ( True, - "incremental", - "microbatch", datetime(2024, 8, 1, 8, 11, 0), None, BatchSize.day, @@ -79,8 +118,6 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): ), ( True, - "incremental", - "microbatch", None, datetime(2024, 8, 1), BatchSize.day, @@ -93,8 +130,6 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): ), ( True, - "incremental", - "microbatch", datetime(2024, 9, 1), datetime(2024, 8, 1), BatchSize.day, @@ -105,11 +140,9 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): start=datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), ), ), - (False, "incremental", "microbatch", None, None, BatchSize.day, 0, None), + (False, None, None, BatchSize.day, 0, None), ( False, - "incremental", - "microbatch", datetime(2024, 8, 1, 8, 11, 0), None, BatchSize.day, @@ -122,8 +155,6 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): ), ( False, - "incremental", - "microbatch", None, datetime(2024, 8, 1), BatchSize.day, @@ -136,8 +167,6 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): ), ( False, - "incremental", - "microbatch", datetime(2024, 9, 1), datetime(2024, 8, 1), BatchSize.day, @@ -150,8 +179,6 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): ), ( True, - "incremental", - "microbatch", datetime(2024, 9, 1, 0, 49, 0), None, BatchSize.hour, @@ -164,8 +191,6 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): ), ( True, - "incremental", - "microbatch", datetime(2024, 9, 1, 13, 31, 0), None, BatchSize.day, @@ -178,8 +203,6 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): ), ( True, - "incremental", - "microbatch", datetime(2024, 1, 23, 12, 30, 0), None, BatchSize.month, @@ -192,8 +215,6 @@ def test_resolve_limit(self, resolver, empty, expected_resolve_limit): ), ( True, - "incremental", - "microbatch", datetime(2024, 1, 23, 12, 30, 0), None, BatchSize.year, @@ -211,8 +232,6 @@ def test_resolve_event_time_filter( mocker: MockerFixture, resolver: ResolverSubclass, is_incremental: bool, - materialized: str, - incremental_strategy: str, event_time_end: Optional[str], event_time_start: Optional[str], batch_size: BatchSize, @@ -226,8 +245,8 @@ def test_resolve_event_time_filter( target.config = mock.MagicMock(NodeConfig) target.config.event_time = "created_at" resolver.model.config = mock.MagicMock(NodeConfig) - resolver.model.config.materialized = materialized - resolver.model.config.incremental_strategy = incremental_strategy + resolver.model.config.materialized = "incremental" + resolver.model.config.incremental_strategy = "microbatch" resolver.model.config.batch_size = batch_size resolver.model.config.lookback = lookback resolver.config.args.EVENT_TIME_END = event_time_end From 5f72534e911d45022a386ddcd02c6c4dafcaf8d5 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Mon, 9 Sep 2024 14:31:43 -0500 Subject: [PATCH 21/38] Add unit test case in regard to event_tiem filtering when `is_incremental` is false --- tests/unit/context/test_providers.py | 96 +++++++++++++--------------- 1 file changed, 46 insertions(+), 50 deletions(-) diff --git a/tests/unit/context/test_providers.py b/tests/unit/context/test_providers.py index 27fe2b8c544..aebfbc688f1 100644 --- a/tests/unit/context/test_providers.py +++ b/tests/unit/context/test_providers.py @@ -90,10 +90,52 @@ def test_resolve_event_time_filter_gating( @freeze_time("2024-09-05 08:56:00") @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) @pytest.mark.parametrize( - "is_incremental,event_time_end,event_time_start,batch_size,lookback,expected_filter", + "event_time_end,event_time_start,expect_filter", + [ + (None, None, False), + (datetime(2024, 9, 5), None, True), + (None, datetime(2024, 9, 4), True), + (datetime(2024, 9, 5), datetime(2024, 9, 4), True), + ], + ) + def test_event_time_filtering_is_incremental_false( + self, + mocker: MockerFixture, + resolver: ResolverSubclass, + event_time_end: datetime, + event_time_start: datetime, + expect_filter: bool, + ) -> None: + mocker.patch("dbt.context.providers.BaseResolver._is_incremental").return_value = False + + # Target mocking + target = mock.Mock() + target.config = mock.MagicMock(NodeConfig) + target.config.event_time = "created_at" + + # Resolver mocking + resolver.config.args.EVENT_TIME_END = event_time_end + resolver.config.args.EVENT_TIME_START = event_time_start + resolver.model.config = mock.MagicMock(NodeConfig) + resolver.model.config.materialized = "incremental" + resolver.model.config.incremental_strategy = "microbatch" + resolver.model.config.batch_size = BatchSize.day + resolver.model.config.lookback = 0 + + # Try to get an EventTimeFilter + event_time_filter = resolver.resolve_event_time_filter(target=target) + + if expect_filter: + assert isinstance(event_time_filter, EventTimeFilter) + else: + assert event_time_filter is None + + @freeze_time("2024-09-05 08:56:00") + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + @pytest.mark.parametrize( + "event_time_end,event_time_start,batch_size,lookback,expected_filter", [ ( - True, None, None, BatchSize.day, @@ -105,7 +147,6 @@ def test_resolve_event_time_filter_gating( ), ), ( - True, datetime(2024, 8, 1, 8, 11, 0), None, BatchSize.day, @@ -117,7 +158,6 @@ def test_resolve_event_time_filter_gating( ), ), ( - True, None, datetime(2024, 8, 1), BatchSize.day, @@ -129,7 +169,6 @@ def test_resolve_event_time_filter_gating( ), ), ( - True, datetime(2024, 9, 1), datetime(2024, 8, 1), BatchSize.day, @@ -140,45 +179,7 @@ def test_resolve_event_time_filter_gating( start=datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), ), ), - (False, None, None, BatchSize.day, 0, None), ( - False, - datetime(2024, 8, 1, 8, 11, 0), - None, - BatchSize.day, - 0, - EventTimeFilter( - field_name="created_at", - end=datetime(2024, 8, 1, 8, 11, 0, 0, pytz.UTC), - start=None, - ), - ), - ( - False, - None, - datetime(2024, 8, 1), - BatchSize.day, - 0, - EventTimeFilter( - field_name="created_at", - end=None, - start=datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), - ), - ), - ( - False, - datetime(2024, 9, 1), - datetime(2024, 8, 1), - BatchSize.day, - 0, - EventTimeFilter( - field_name="created_at", - end=datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC), - start=datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), - ), - ), - ( - True, datetime(2024, 9, 1, 0, 49, 0), None, BatchSize.hour, @@ -190,7 +191,6 @@ def test_resolve_event_time_filter_gating( ), ), ( - True, datetime(2024, 9, 1, 13, 31, 0), None, BatchSize.day, @@ -202,7 +202,6 @@ def test_resolve_event_time_filter_gating( ), ), ( - True, datetime(2024, 1, 23, 12, 30, 0), None, BatchSize.month, @@ -214,7 +213,6 @@ def test_resolve_event_time_filter_gating( ), ), ( - True, datetime(2024, 1, 23, 12, 30, 0), None, BatchSize.year, @@ -231,16 +229,14 @@ def test_resolve_event_time_filter( self, mocker: MockerFixture, resolver: ResolverSubclass, - is_incremental: bool, event_time_end: Optional[str], event_time_start: Optional[str], batch_size: BatchSize, lookback: int, expected_filter: Optional[EventTimeFilter], ) -> None: - mocker.patch("dbt.context.providers.BaseResolver._is_incremental").return_value = ( - is_incremental - ) + mocker.patch("dbt.context.providers.BaseResolver._is_incremental").return_value = True + target = mock.Mock() target.config = mock.MagicMock(NodeConfig) target.config.event_time = "created_at" From a8b9d64438d99feca7f2fbf06e37524d518982e4 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Mon, 9 Sep 2024 14:46:39 -0500 Subject: [PATCH 22/38] Cleanup unit test for test_resolve_event_time_filter_batch_calculation --- tests/unit/context/test_providers.py | 97 +++++++++++----------------- 1 file changed, 39 insertions(+), 58 deletions(-) diff --git a/tests/unit/context/test_providers.py b/tests/unit/context/test_providers.py index aebfbc688f1..c68579f2894 100644 --- a/tests/unit/context/test_providers.py +++ b/tests/unit/context/test_providers.py @@ -133,113 +133,95 @@ def test_event_time_filtering_is_incremental_false( @freeze_time("2024-09-05 08:56:00") @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) @pytest.mark.parametrize( - "event_time_end,event_time_start,batch_size,lookback,expected_filter", + "event_time_end,event_time_start,batch_size,lookback,expected_end,expected_start", [ ( None, None, BatchSize.day, 0, - EventTimeFilter( - field_name="created_at", - end=datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC), - start=datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC), - ), + datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC), + datetime(2024, 9, 5, 0, 0, 0, 0, pytz.UTC), ), ( - datetime(2024, 8, 1, 8, 11, 0), + datetime(2024, 8, 1, 8, 11), None, BatchSize.day, 0, - EventTimeFilter( - field_name="created_at", - end=datetime(2024, 8, 1, 8, 11, 0, 0, pytz.UTC), - start=datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), - ), + datetime(2024, 8, 1, 8, 11, 0, 0, pytz.UTC), + datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), ), ( None, datetime(2024, 8, 1), BatchSize.day, 0, - EventTimeFilter( - field_name="created_at", - end=datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC), - start=datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), - ), + datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC), + datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), ), ( datetime(2024, 9, 1), datetime(2024, 8, 1), BatchSize.day, 0, - EventTimeFilter( - field_name="created_at", - end=datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC), - start=datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), - ), + datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC), + datetime(2024, 8, 1, 0, 0, 0, 0, pytz.UTC), ), ( - datetime(2024, 9, 1, 0, 49, 0), + datetime(2024, 9, 1, 0, 49), None, BatchSize.hour, 1, - EventTimeFilter( - field_name="created_at", - end=datetime(2024, 9, 1, 0, 49, 0, 0, pytz.UTC), - start=datetime(2024, 8, 31, 23, 0, 0, 0, pytz.UTC), - ), + datetime(2024, 9, 1, 0, 49, 0, 0, pytz.UTC), + datetime(2024, 8, 31, 23, 0, 0, 0, pytz.UTC), ), ( - datetime(2024, 9, 1, 13, 31, 0), + datetime(2024, 9, 1, 13, 31), None, BatchSize.day, 1, - EventTimeFilter( - field_name="created_at", - end=datetime(2024, 9, 1, 13, 31, 0, 0, pytz.UTC), - start=datetime(2024, 8, 31, 0, 0, 0, 0, pytz.UTC), - ), + datetime(2024, 9, 1, 13, 31, 0, 0, pytz.UTC), + datetime(2024, 8, 31, 0, 0, 0, 0, pytz.UTC), ), ( - datetime(2024, 1, 23, 12, 30, 0), + datetime(2024, 1, 23, 12, 30), None, BatchSize.month, 1, - EventTimeFilter( - field_name="created_at", - end=datetime(2024, 1, 23, 12, 30, 0, 0, pytz.UTC), - start=datetime(2023, 12, 1, 0, 0, 0, 0, pytz.utc), - ), + datetime(2024, 1, 23, 12, 30, 0, 0, pytz.UTC), + datetime(2023, 12, 1, 0, 0, 0, 0, pytz.UTC), ), ( - datetime(2024, 1, 23, 12, 30, 0), + datetime(2024, 1, 23, 12, 30), None, BatchSize.year, 1, - EventTimeFilter( - field_name="created_at", - end=datetime(2024, 1, 23, 12, 30, 0, 0, pytz.UTC), - start=datetime(2023, 1, 1, 0, 0, 0, 0, pytz.utc), - ), + datetime(2024, 1, 23, 12, 30, 0, 0, pytz.UTC), + datetime(2023, 1, 1, 0, 0, 0, 0, pytz.UTC), ), ], ) - def test_resolve_event_time_filter( + def test_resolve_event_time_filter_batch_calculation( self, mocker: MockerFixture, resolver: ResolverSubclass, - event_time_end: Optional[str], - event_time_start: Optional[str], + event_time_end: Optional[datetime], + event_time_start: Optional[datetime], batch_size: BatchSize, lookback: int, - expected_filter: Optional[EventTimeFilter], + expected_end: datetime, + expected_start: datetime, ) -> None: + event_time = "created_at" + mocker.patch("dbt.context.providers.BaseResolver._is_incremental").return_value = True + # Target mocking target = mock.Mock() target.config = mock.MagicMock(NodeConfig) - target.config.event_time = "created_at" + target.config.event_time = event_time + + # Resolver mocking resolver.model.config = mock.MagicMock(NodeConfig) resolver.model.config.materialized = "incremental" resolver.model.config.incremental_strategy = "microbatch" @@ -247,15 +229,14 @@ def test_resolve_event_time_filter( resolver.model.config.lookback = lookback resolver.config.args.EVENT_TIME_END = event_time_end resolver.config.args.EVENT_TIME_START = event_time_start + + # Get EventTimeFilter event_time_filter = resolver.resolve_event_time_filter(target=target) - if expected_filter is not None: - assert event_time_filter is not None - assert event_time_filter.field_name == expected_filter.field_name - assert event_time_filter.end == expected_filter.end - assert event_time_filter.start == expected_filter.start - else: - assert event_time_filter is None + assert event_time_filter is not None + assert event_time_filter.field_name == event_time + assert event_time_filter.end == expected_end + assert event_time_filter.start == expected_start class TestRuntimeRefResolver: From b85d50eef23464d89bed52788d39aee023751e84 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 10 Sep 2024 10:17:55 -0500 Subject: [PATCH 23/38] Update lower bound of dbt-adapters req to 1.5.0 Previously our lower bound was 1.3.0. This will no longer work. In dbt-adapters 1.5.0 we released the EventTimeFilter class, and dbt-core now depends on that. Thus dbt-adapters < 1.5.0 are no longer compatible. --- core/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/setup.py b/core/setup.py index 904b23afeb6..1c6b9e6c575 100644 --- a/core/setup.py +++ b/core/setup.py @@ -72,7 +72,7 @@ "dbt-semantic-interfaces>=0.7.0,<0.8", # Minor versions for these are expected to be backwards-compatible "dbt-common>=1.6.0,<2.0", - "dbt-adapters>=1.3.0,<2.0", + "dbt-adapters>=1.5.0,<2.0", # ---- # Expect compatibility with all new versions of these packages, so lower bounds only. "packaging>20.9", From ec011296702c243368d0cf7ac7b1af05f352e92c Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 10 Sep 2024 10:19:08 -0500 Subject: [PATCH 24/38] Update dev-reqs now that the changes we need have been merged to main in dbt-adapters --- dev-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 019143f67ea..8fee97deab2 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,4 +1,4 @@ -git+https://github.com/dbt-labs/dbt-adapters.git@event-time-ref-filtering +git+https://github.com/dbt-labs/dbt-adapters.git@main git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git@main git+https://github.com/dbt-labs/dbt-postgres.git@poc-microbatch-merge From 8f8e7e3ffe5b361a1f88a85f6d465f0f7606f86b Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 10 Sep 2024 10:45:19 -0500 Subject: [PATCH 25/38] Remove `is_incremental` check from `_build_end_time` Even when `is_incremental` is false, we still plan to _microbatch_ incremental microbatch models. To do so we need to have an end time. Additionally, some tests were wonky because of the previous behavior --- core/dbt/context/providers.py | 13 +++---------- tests/functional/microbatch/test_microbatch.py | 2 +- tests/unit/context/test_providers.py | 2 +- 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index 063578b81c6..e1e57f051d6 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -234,11 +234,8 @@ def Relation(self): def resolve_limit(self) -> Optional[int]: return 0 if getattr(self.config.args, "EMPTY", False) else None - def _build_end_time(self, is_incremental: bool) -> Optional[datetime]: - if not is_incremental: - return None - else: - return datetime.now(tz=pytz.utc) + def _build_end_time(self) -> Optional[datetime]: + return datetime.now(tz=pytz.utc) def _build_start_time( self, checkpoint: Optional[datetime], is_incremental: bool @@ -308,11 +305,7 @@ def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeF ): is_incremental = self._is_incremental() end: Optional[datetime] = getattr(self.config.args, "EVENT_TIME_END", None) - end = ( - end.replace(tzinfo=pytz.UTC) - if end - else self._build_end_time(is_incremental=is_incremental) - ) + end = end.replace(tzinfo=pytz.UTC) if end else self._build_end_time() start: Optional[datetime] = getattr(self.config.args, "EVENT_TIME_START", None) start = ( diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 0d65702b44a..fbb95528f9d 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -89,7 +89,7 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int) @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # initial run -- backfills all data - with freeze_time("2020-01-01 13:57:00"): + with freeze_time("2020-01-03 13:57:00"): run_dbt(["run"]) self.assert_row_count(project, "microbatch_model", 3) diff --git a/tests/unit/context/test_providers.py b/tests/unit/context/test_providers.py index c68579f2894..97f04eba817 100644 --- a/tests/unit/context/test_providers.py +++ b/tests/unit/context/test_providers.py @@ -92,7 +92,7 @@ def test_resolve_event_time_filter_gating( @pytest.mark.parametrize( "event_time_end,event_time_start,expect_filter", [ - (None, None, False), + (None, None, True), (datetime(2024, 9, 5), None, True), (None, datetime(2024, 9, 4), True), (datetime(2024, 9, 5), datetime(2024, 9, 4), True), From 2b80f7546cb826c9c840c5bb64a41289b5c022d1 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 10 Sep 2024 10:52:27 -0500 Subject: [PATCH 26/38] Improve help messages for `event_time_start/end` CLI flags --- core/dbt/cli/params.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/dbt/cli/params.py b/core/dbt/cli/params.py index 5c2ee81691b..425009d76ee 100644 --- a/core/dbt/cli/params.py +++ b/core/dbt/cli/params.py @@ -94,8 +94,7 @@ event_time_end = click.option( "--event-time-end", envvar="DBT_EVENT_TIME_END", - # TODO: improve help text - help="upper bound to filter refs on", + help="If specified, the end datetime dbt uses to filter microbatch model inputs (exclusive).", type=click.DateTime(), default=None, ) @@ -103,8 +102,7 @@ event_time_start = click.option( "--event-time-start", envvar="DBT_EVENT_TIME_START", - # TODO: improve help text - help="lower bound to filter refs on", + help="If specified, the start datetime dbt uses to filter microbatch model inputs (inclusive).", type=click.DateTime(), default=None, ) From 0eb8e2f6264f36efc551648a53a13df8a9abf807 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 10 Sep 2024 10:53:14 -0500 Subject: [PATCH 27/38] Improve error handling messages for event time filtering generation --- core/dbt/context/providers.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index e1e57f051d6..73a9fd50b82 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -246,8 +246,7 @@ def _build_start_time( assert isinstance(self.model.config, NodeConfig) batch_size = self.model.config.batch_size if batch_size is None: - # TODO: Better error message - raise DbtRuntimeError("Batch size not specified") + raise DbtRuntimeError(f"The model `{self.model.name}` requires a `batch_size`") lookback = self.model.config.lookback if batch_size == BatchSize.hour: @@ -273,8 +272,9 @@ def _build_start_time( elif batch_size == BatchSize.year: start = datetime(checkpoint.year - lookback, 1, 1, 0, 0, 0, 0, pytz.utc) else: - # TODO: Better error message - raise DbtInternalError("This should be impossible :eeek:") + raise DbtInternalError( + f"Batch size `{batch_size}` is not handled during batch calculation" + ) return start From 1690d814959124828b0d21a670bb7ce6e88cbd51 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 11 Sep 2024 12:04:27 -0500 Subject: [PATCH 28/38] Delete pairing.md file --- pairing.md | 21 --------------------- 1 file changed, 21 deletions(-) delete mode 100644 pairing.md diff --git a/pairing.md b/pairing.md deleted file mode 100644 index 0302bfd8e2b..00000000000 --- a/pairing.md +++ /dev/null @@ -1,21 +0,0 @@ -### Requirements - -- [p0*] It is possible to configure a top-level event_time key as a top-level property of a model - - TODO: consider backward compatability; perhaps call it an Any and do more aggressive validation + skipping if not str - -- [p0*] It is possible to hard-code a lower and upper bound time window to apply to all microbatch runs within an invocation via CLI flags (--event-start-time, --event-end-time) - - default: open on the left, closed on the right - - 1 < x <= 2 is open on the left, closed on the right - - so t=1, t=5 → update [2,3,4,5] - - TODO: consider a custom click type for parsing datetimes from the command line similar to run error options - -- It is possible to *automatically* read (via `ref` and `source`) just the “new” data for inputs with `event_time` defined in the context of a microbatch model - - [p0*] “New” data is defined by dynamic checkpoints: current_timestamp as upper bound, lower bound as a partition-aware offset of that - -- [p0*] It is possible to configure a “lookback period” that applies to the read window of a microbatch model. - -- [p0*] It is possible to efficiently *write* entire partitions representing the newly computed data for a given microbatch model run. - - https://docs.getdbt.com/docs/build/incremental-strategy - - Target warehouses: - - [p0*]`insert_overwrite` dbt-bigquery, dbt-spark, dbt-databricks - - [p0*]`delete+insert` dbt-snowflake From de87286876172e929c673e1126e7b046aeca4da5 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 11 Sep 2024 12:13:16 -0500 Subject: [PATCH 29/38] Add changie doc for initial microbatch implementation --- .changes/unreleased/Features-20240911-121029.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20240911-121029.yaml diff --git a/.changes/unreleased/Features-20240911-121029.yaml b/.changes/unreleased/Features-20240911-121029.yaml new file mode 100644 index 00000000000..365faf7fadd --- /dev/null +++ b/.changes/unreleased/Features-20240911-121029.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add basic functionality for creating microbatch incremental models +time: 2024-09-11T12:10:29.822189-05:00 +custom: + Author: MichelleArk QMalcolm + Issue: 9490 10635 10637 10638 10636 10662 10639 From 12bfec915f2b8429c2b9b775d25dca6911ca61fb Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 11 Sep 2024 14:48:29 -0500 Subject: [PATCH 30/38] Update v12.json manifest schema for NodeConfig changes --- schemas/dbt/manifest/v12.json | 162 ++++++++++++++++++++++++++++++++++ 1 file changed, 162 insertions(+) diff --git a/schemas/dbt/manifest/v12.json b/schemas/dbt/manifest/v12.json index dfa5744ce70..d4e0fbc2239 100644 --- a/schemas/dbt/manifest/v12.json +++ b/schemas/dbt/manifest/v12.json @@ -259,6 +259,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -434,6 +440,9 @@ }, "additionalProperties": false }, + "event_time": { + "default": null + }, "delimiter": { "type": "string", "default": "," @@ -909,6 +918,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -1083,6 +1098,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -1283,6 +1301,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -1457,6 +1481,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -2704,6 +2731,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -2878,6 +2911,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -3488,6 +3524,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -3663,6 +3705,9 @@ }, "additionalProperties": false }, + "event_time": { + "default": null + }, "access": { "enum": [ "private", @@ -4399,6 +4444,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -4573,6 +4624,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -4800,6 +4854,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -4974,6 +5034,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -6282,6 +6345,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -6451,6 +6520,9 @@ }, "additionalProperties": false }, + "event_time": { + "default": null + }, "strategy": { "anyOf": [ { @@ -7117,6 +7189,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -7291,6 +7369,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -9874,6 +9955,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -10049,6 +10136,9 @@ }, "additionalProperties": false }, + "event_time": { + "default": null + }, "delimiter": { "type": "string", "default": "," @@ -10524,6 +10614,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -10698,6 +10794,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -10898,6 +10997,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -11072,6 +11177,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -12319,6 +12427,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -12493,6 +12607,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -13103,6 +13220,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -13278,6 +13401,9 @@ }, "additionalProperties": false }, + "event_time": { + "default": null + }, "access": { "enum": [ "private", @@ -14014,6 +14140,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -14188,6 +14320,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -14415,6 +14550,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -14589,6 +14730,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -15897,6 +16041,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -16066,6 +16216,9 @@ }, "additionalProperties": false }, + "event_time": { + "default": null + }, "strategy": { "anyOf": [ { @@ -16732,6 +16885,12 @@ ], "default": null }, + "batch_size": { + "default": null + }, + "lookback": { + "default": 0 + }, "persist_docs": { "type": "object", "propertyNames": { @@ -16906,6 +17065,9 @@ } }, "additionalProperties": false + }, + "event_time": { + "default": null } }, "additionalProperties": true From 9b47966609cd6b05c8db800fb681f556620ddaf3 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Thu, 12 Sep 2024 01:17:45 -0500 Subject: [PATCH 31/38] Add `event_time` to `SourceConfig` and allow event time filtering for sources --- .../resources/v1/source_definition.py | 1 + core/dbt/context/providers.py | 4 +- .../functional/microbatch/test_microbatch.py | 94 +++++++++++++++++++ 3 files changed, 97 insertions(+), 2 deletions(-) diff --git a/core/dbt/artifacts/resources/v1/source_definition.py b/core/dbt/artifacts/resources/v1/source_definition.py index ac0fcfca1b2..6c1c3679a00 100644 --- a/core/dbt/artifacts/resources/v1/source_definition.py +++ b/core/dbt/artifacts/resources/v1/source_definition.py @@ -19,6 +19,7 @@ @dataclass class SourceConfig(BaseConfig): enabled: bool = True + event_time: Any = None @dataclass diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index 73a9fd50b82..05fecd8423d 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -30,7 +30,7 @@ get_adapter_package_names, get_adapter_type_names, ) -from dbt.artifacts.resources import NodeConfig, NodeVersion, RefArgs +from dbt.artifacts.resources import NodeConfig, NodeVersion, RefArgs, SourceConfig from dbt.artifacts.resources.types import BatchSize from dbt.clients.jinja import ( MacroGenerator, @@ -298,7 +298,7 @@ def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeF event_time_filter = None if ( os.environ.get("DBT_EXPERIMENTAL_MICROBATCH") - and isinstance(target.config, NodeConfig) + and (isinstance(target.config, NodeConfig) or isinstance(target.config, SourceConfig)) and target.config.event_time and self.model.config.materialized == "incremental" and self.model.config.incremental_strategy == "microbatch" diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index fbb95528f9d..dd8892eb370 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -21,6 +21,35 @@ select * from {{ ref('input_model') }} """ +seed_csv = """id,event_time +1,'2020-01-01 00:00:00-0' +2,'2020-01-02 00:00:00-0' +3,'2020-01-03 00:00:00-0' +""" + +seeds_yaml = """ +seeds: + - name: raw_source + config: + column_types: + event_time: TIMESTAMP +""" + +sources_yaml = """ +sources: + - name: seed_sources + schema: "{{ target.schema }}" + tables: + - name: raw_source + config: + event_time: event_time +""" + +microbatch_model_calling_source_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} +select * from {{ source('seed_sources', 'raw_source') }} +""" + class TestMicrobatchCLI: @pytest.fixture(scope="class") @@ -121,3 +150,68 @@ def test_run_with_event_time(self, project): with freeze_time("2020-01-05 14:57:00"): run_dbt(["run", "--select", "microbatch_model"]) self.assert_row_count(project, "microbatch_model", 5) + + +class TestMicrobatchWithSource: + @pytest.fixture(scope="class") + def seeds(self): + return { + "raw_source.csv": seed_csv, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "microbatch_model.sql": microbatch_model_calling_source_sql, + "sources.yml": sources_yaml, + "seeds.yml": seeds_yaml, + } + + def assert_row_count(self, project, relation_name: str, expected_row_count: int): + relation = relation_from_name(project.adapter, relation_name) + result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") + + if result[0] != expected_row_count: + # running show for debugging + run_dbt(["show", "--inline", f"select * from {relation}"]) + + assert result[0] == expected_row_count + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_run_with_event_time(self, project): + # ensure seed is created for source + run_dbt(["seed"]) + + # initial run -- backfills all data + with freeze_time("2020-01-03 13:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # our partition grain is "day" so running the same day without new data should produce the same results + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # add next two days of data + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.run_sql( + f"insert into {test_schema_relation}.raw_source(id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00-0'), (5, TIMESTAMP '2020-01-05 00:00:00-0')" + ) + self.assert_row_count(project, "raw_source", 5) + + # re-run without changing current time => no insert + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 3) + + # re-run by advancing time by one day changing current time => insert 1 row + with freeze_time("2020-01-04 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 4) + + # re-run by advancing time by one more day changing current time => insert 1 more row + with freeze_time("2020-01-05 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 5) From 676794848c7939d2f70172f0874a5cf934056182 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Thu, 12 Sep 2024 01:26:17 -0500 Subject: [PATCH 32/38] Add test that asserts direct upstrean inputs without event_time aren't filtered --- .../functional/microbatch/test_microbatch.py | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index dd8892eb370..0098e090f81 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -16,6 +16,16 @@ select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time """ +input_model_without_event_time_sql = """ +{{ config(materialized='table') }} + +select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time +union all +select 2 as id, TIMESTAMP '2020-01-02 00:00:00-0' as event_time +union all +select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time +""" + microbatch_model_sql = """ {{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} select * from {{ ref('input_model') }} @@ -215,3 +225,48 @@ def test_run_with_event_time(self, project): with freeze_time("2020-01-05 14:57:00"): run_dbt(["run", "--select", "microbatch_model"]) self.assert_row_count(project, "microbatch_model", 5) + + +class TestMicrobatchWithInputWithoutEventTime: + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_without_event_time_sql, + "microbatch_model.sql": microbatch_model_sql, + } + + def assert_row_count(self, project, relation_name: str, expected_row_count: int): + relation = relation_from_name(project.adapter, relation_name) + result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") + + if result[0] != expected_row_count: + # running show for debugging + run_dbt(["show", "--inline", f"select * from {relation}"]) + + assert result[0] == expected_row_count + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_run_with_event_time(self, project): + # initial run -- backfills all data + with freeze_time("2020-01-03 13:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # our partition grain is "day" so running the same day without new data should produce the same results + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # add next two days of data + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.run_sql( + f"insert into {test_schema_relation}.input_model(id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00-0'), (5, TIMESTAMP '2020-01-05 00:00:00-0')" + ) + self.assert_row_count(project, "input_model", 5) + + # re-run without changing current time => INSERT BECAUSE INPUT MODEL ISN'T BEING FILTERED + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 5) From 61123cb040835890148ce53cea5665f7a1230f65 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Thu, 12 Sep 2024 01:33:40 -0500 Subject: [PATCH 33/38] Add test asserting that calling `.render()` on a ref skips event time filtering --- .../functional/microbatch/test_microbatch.py | 62 ++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 0098e090f81..8bdb5abab2c 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -4,7 +4,7 @@ import pytest from freezegun import freeze_time -from dbt.tests.util import relation_from_name, run_dbt +from dbt.tests.util import relation_from_name, run_dbt, write_file input_model_sql = """ {{ config(materialized='table', event_time='event_time') }} @@ -31,6 +31,11 @@ select * from {{ ref('input_model') }} """ +microbatch_model_ref_render_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} +select * from {{ ref('input_model').render() }} +""" + seed_csv = """id,event_time 1,'2020-01-01 00:00:00-0' 2,'2020-01-02 00:00:00-0' @@ -270,3 +275,58 @@ def test_run_with_event_time(self, project): with freeze_time("2020-01-03 14:57:00"): run_dbt(["run", "--select", "microbatch_model"]) self.assert_row_count(project, "microbatch_model", 5) + + +class TestMicrobatchUsingRefRenderSkipsFilter: + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_sql, + } + + def assert_row_count(self, project, relation_name: str, expected_row_count: int): + relation = relation_from_name(project.adapter, relation_name) + result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") + + if result[0] != expected_row_count: + # running show for debugging + run_dbt(["show", "--inline", f"select * from {relation}"]) + + assert result[0] == expected_row_count + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_run_with_event_time(self, project): + # initial run -- backfills all data + with freeze_time("2020-01-03 13:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # our partition grain is "day" so running the same day without new data should produce the same results + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + # add next two days of data + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.run_sql( + f"insert into {test_schema_relation}.input_model(id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00-0'), (5, TIMESTAMP '2020-01-05 00:00:00-0')" + ) + self.assert_row_count(project, "input_model", 5) + + # re-run without changing current time => no insert + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 3) + + # Update microbatch model to call .render() on ref('input_model') + write_file( + microbatch_model_ref_render_sql, project.project_root, "models", "microbatch_model.sql" + ) + + # re-run without changing current time => INSERT because .render() skips filtering + with freeze_time("2020-01-03 14:57:00"): + run_dbt(["run", "--select", "microbatch_model"]) + self.assert_row_count(project, "microbatch_model", 5) From 43866db72a00bea5c77c64740005c9c0ab373690 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Thu, 12 Sep 2024 16:26:16 -0500 Subject: [PATCH 34/38] Update v12 manifest schema with new SourceConfig key for event_time --- schemas/dbt/manifest/v12.json | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/schemas/dbt/manifest/v12.json b/schemas/dbt/manifest/v12.json index d4e0fbc2239..3ff25374066 100644 --- a/schemas/dbt/manifest/v12.json +++ b/schemas/dbt/manifest/v12.json @@ -7954,6 +7954,9 @@ "enabled": { "type": "boolean", "default": true + }, + "event_time": { + "default": null } }, "additionalProperties": true @@ -17641,6 +17644,9 @@ "enabled": { "type": "boolean", "default": true + }, + "event_time": { + "default": null } }, "additionalProperties": true From 5426e663b522cd1a964bc29c11370b8027fefaaa Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 12 Sep 2024 17:28:46 -0400 Subject: [PATCH 35/38] fix test_list for source config event_time --- tests/functional/list/test_list.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/functional/list/test_list.py b/tests/functional/list/test_list.py index fdd838891ab..de20970038a 100644 --- a/tests/functional/list/test_list.py +++ b/tests/functional/list/test_list.py @@ -464,6 +464,7 @@ def expect_source_output(self): "json": { "config": { "enabled": True, + "event_time": None, }, "unique_id": "source.test.my_source.my_table", "original_file_path": normalize("models/schema.yml"), From ee29fed293e142ea78f2200c38ffbde2608281c7 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 12 Sep 2024 17:33:54 -0400 Subject: [PATCH 36/38] restore dbt-postgres dev requirement --- dev-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 8fee97deab2..20605e632b8 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,7 +1,7 @@ git+https://github.com/dbt-labs/dbt-adapters.git@main git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git@main -git+https://github.com/dbt-labs/dbt-postgres.git@poc-microbatch-merge +git+https://github.com/dbt-labs/dbt-postgres.git@main # black must match what's in .pre-commit-config.yaml to be sure local env matches CI black==24.3.0 bumpversion From 3020aa46e44b7b0d3b2dee59ce06936d0e7f044e Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Thu, 12 Sep 2024 16:35:00 -0500 Subject: [PATCH 37/38] Update dbt-adapters minimum version to 1.6.0 --- core/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/setup.py b/core/setup.py index 1c6b9e6c575..c254062486b 100644 --- a/core/setup.py +++ b/core/setup.py @@ -72,7 +72,7 @@ "dbt-semantic-interfaces>=0.7.0,<0.8", # Minor versions for these are expected to be backwards-compatible "dbt-common>=1.6.0,<2.0", - "dbt-adapters>=1.5.0,<2.0", + "dbt-adapters>=1.6.0,<2.0", # ---- # Expect compatibility with all new versions of these packages, so lower bounds only. "packaging>20.9", From f53204f058d77cba98cc14ab09b1696ae018d8b8 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 12 Sep 2024 17:55:21 -0400 Subject: [PATCH 38/38] fix test_artifacts; add event_time to expected_manifest --- tests/functional/artifacts/expected_manifest.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/functional/artifacts/expected_manifest.py b/tests/functional/artifacts/expected_manifest.py index 6de0001430f..a50efab9434 100644 --- a/tests/functional/artifacts/expected_manifest.py +++ b/tests/functional/artifacts/expected_manifest.py @@ -761,6 +761,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False): }, "config": { "enabled": True, + "event_time": None, }, "quoting": { "database": None, @@ -1263,6 +1264,7 @@ def expected_references_manifest(project): }, "config": { "enabled": True, + "event_time": None, }, "quoting": { "database": False,