Skip to content

Commit

Permalink
Merge branch 'main' into snapshot_column_names
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank committed Sep 20, 2024
2 parents 12c96bc + 7016cd3 commit 3406e60
Show file tree
Hide file tree
Showing 29 changed files with 1,431 additions and 395 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240903-132428.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Enable `--resource-type` and `--exclude-resource-type` CLI flags and environment variables for `dbt test`
time: 2024-09-03T13:24:28.592837+01:00
custom:
Author: TowardOliver dbeatty10
Issue: "10656"
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240913-232111.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Execute microbatch models in batches
time: 2024-09-13T23:21:11.935434-04:00
custom:
Author: michelleark
Issue: "10700"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240917-174446.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix `--resource-type test` for `dbt list` and `dbt build`
time: 2024-09-17T17:44:46.121032-06:00
custom:
Author: dbeatty10
Issue: "10730"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240911-162730.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Add Snowplow tracking for behavior flag deprecations
time: 2024-09-11T16:27:30.293832-04:00
custom:
Author: mikealfare
Issue: "10552"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240916-102201.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Replace `TestSelector` with `ResourceTypeSelector`
time: 2024-09-16T10:22:01.339462-06:00
custom:
Author: dbeatty10
Issue: "10718"
7 changes: 7 additions & 0 deletions .changes/unreleased/Under the Hood-20240918-170325.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Under the Hood
body: Standardize returning `ResourceTypeSelector` instances in `dbt list` and `dbt
build`
time: 2024-09-18T17:03:25.639516-06:00
custom:
Author: dbeatty10
Issue: "10739"
2 changes: 1 addition & 1 deletion .github/workflows/docs-issue.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ jobs:
with:
issue_repository: "dbt-labs/docs.getdbt.com"
issue_title: "[Core] Docs Changes Needed from ${{ github.event.repository.name }} Issue #${{ github.event.issue.number }}"
issue_body: "At a minimum, update body to include a link to the page on docs.getdbt.com requiring updates and what part(s) of the page you would like to see updated."
issue_body: "At a minimum, update body to include a link to the page on docs.getdbt.com requiring updates and what part(s) of the page you would like to see updated.\n Originating from this issue: https://github.com/dbt-labs/dbt-core/issues/${{ github.event.issue.number }}"
secrets: inherit
2 changes: 2 additions & 0 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,8 @@ def freshness(ctx, **kwargs):
@click.pass_context
@global_flags
@p.exclude
@p.resource_type
@p.exclude_resource_type
@p.profiles_dir
@p.project_dir
@p.select
Expand Down
75 changes: 2 additions & 73 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import abc
import os
from copy import deepcopy
from datetime import datetime, timedelta
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -17,7 +16,6 @@
Union,
)

import pytz
from typing_extensions import Protocol

from dbt import selected_resources
Expand All @@ -31,7 +29,6 @@
get_adapter_type_names,
)
from dbt.artifacts.resources import NodeConfig, NodeVersion, RefArgs, SourceConfig
from dbt.artifacts.resources.types import BatchSize
from dbt.clients.jinja import (
MacroGenerator,
MacroStack,
Expand Down Expand Up @@ -234,66 +231,6 @@ def Relation(self):
def resolve_limit(self) -> Optional[int]:
return 0 if getattr(self.config.args, "EMPTY", False) else None

def _build_end_time(self) -> Optional[datetime]:
return datetime.now(tz=pytz.utc)

def _build_start_time(
self, checkpoint: Optional[datetime], is_incremental: bool
) -> Optional[datetime]:
if not is_incremental or checkpoint is None:
return None

assert isinstance(self.model.config, NodeConfig)
batch_size = self.model.config.batch_size
if batch_size is None:
raise DbtRuntimeError(f"The model `{self.model.name}` requires a `batch_size`")

lookback = self.model.config.lookback
if batch_size == BatchSize.hour:
start = datetime(
checkpoint.year,
checkpoint.month,
checkpoint.day,
checkpoint.hour,
0,
0,
0,
pytz.utc,
) - timedelta(hours=lookback)
elif batch_size == BatchSize.day:
start = datetime(
checkpoint.year, checkpoint.month, checkpoint.day, 0, 0, 0, 0, pytz.utc
) - timedelta(days=lookback)
elif batch_size == BatchSize.month:
start = datetime(checkpoint.year, checkpoint.month, 1, 0, 0, 0, 0, pytz.utc)
for _ in range(lookback):
start = start - timedelta(days=1)
start = datetime(start.year, start.month, 1, 0, 0, 0, 0, pytz.utc)
elif batch_size == BatchSize.year:
start = datetime(checkpoint.year - lookback, 1, 1, 0, 0, 0, 0, pytz.utc)
else:
raise DbtInternalError(
f"Batch size `{batch_size}` is not handled during batch calculation"
)

return start

def _is_incremental(self) -> bool:
# TODO: Remove. This is a temporary method. We're working with adapters on
# a strategy to ensure we can access the `is_incremental` logic without drift
relation_info = self.Relation.create_from(self.config, self.model)
relation = self.db_wrapper.get_relation(
relation_info.database, relation_info.schema, relation_info.name
)
return (
relation is not None
and relation.type == "table"
and self.model.config.materialized == "incremental"
and not (
getattr(self.config.args, "FULL_REFRESH", False) or self.model.config.full_refresh
)
)

def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]:
event_time_filter = None
if (
Expand All @@ -303,16 +240,8 @@ def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeF
and self.model.config.materialized == "incremental"
and self.model.config.incremental_strategy == "microbatch"
):
is_incremental = self._is_incremental()
end: Optional[datetime] = getattr(self.config.args, "EVENT_TIME_END", None)
end = end.replace(tzinfo=pytz.UTC) if end else self._build_end_time()

start: Optional[datetime] = getattr(self.config.args, "EVENT_TIME_START", None)
start = (
start.replace(tzinfo=pytz.UTC)
if start
else self._build_start_time(checkpoint=end, is_incremental=is_incremental)
)
start = self.model.config.get("__dbt_internal_microbatch_event_time_start")
end = self.model.config.get("__dbt_internal_microbatch_event_time_end")

if start is not None or end is not None:
event_time_filter = EventTimeFilter(
Expand Down
3 changes: 3 additions & 0 deletions core/dbt/events/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from functools import partial
from typing import Callable, List

from dbt.tracking import track_behavior_change_warn
from dbt_common.events.base_types import EventLevel, EventMsg
from dbt_common.events.event_manager_client import (
add_callback_to_manager,
add_logger_to_manager,
cleanup_event_logger,
get_event_manager,
Expand Down Expand Up @@ -68,6 +70,7 @@ def setup_event_logger(flags, callbacks: List[Callable[[EventMsg], None]] = [])
make_log_dir_if_missing(flags.LOG_PATH)
event_manager = get_event_manager()
event_manager.callbacks = callbacks.copy()
add_callback_to_manager(track_behavior_change_warn)

if flags.LOG_LEVEL != "none":
line_format = _line_format_from_str(flags.LOG_FORMAT, LineFormat.PlainText)
Expand Down
Empty file.
Empty file.
164 changes: 164 additions & 0 deletions core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
from datetime import datetime, timedelta
from typing import List, Optional, Tuple

import pytz

from dbt.artifacts.resources.types import BatchSize
from dbt.contracts.graph.nodes import ModelNode, NodeConfig
from dbt.exceptions import DbtInternalError, DbtRuntimeError


class MicrobatchBuilder:
"""A utility class for building microbatch definitions associated with a specific model"""

def __init__(
self,
model: ModelNode,
is_incremental: bool,
event_time_start: Optional[datetime],
event_time_end: Optional[datetime],
):
if model.config.incremental_strategy != "microbatch":
raise DbtInternalError(
f"Model '{model.name}' does not use 'microbatch' incremental_strategy."
)
self.model = model

if self.model.config.batch_size is None:
raise DbtRuntimeError(
f"Microbatch model '{self.model.name}' does not have a 'batch_size' config (one of {[batch_size.value for batch_size in BatchSize]}) specificed."
)

self.is_incremental = is_incremental
self.event_time_start = (
event_time_start.replace(tzinfo=pytz.UTC) if event_time_start else None
)
self.event_time_end = event_time_end.replace(tzinfo=pytz.UTC) if event_time_end else None

def build_end_time(self):
"""Defaults the end_time to the current time in UTC unless a non `None` event_time_end was provided"""
return self.event_time_end or datetime.now(tz=pytz.utc)

def build_start_time(self, checkpoint: Optional[datetime]):
"""Create a start time based off the passed in checkpoint.
If the checkpoint is `None`, then `None` will be returned as a checkpoint is necessary
to build a start time. This is because we build the start time relative to the checkpoint
via the batchsize and offset, and we cannot offset a checkpoint if there is no checkpoint.
"""

if self.event_time_start:
return MicrobatchBuilder.truncate_timestamp(
self.event_time_start, self.model.config.batch_size
)

if not self.is_incremental or checkpoint is None:
# TODO: return new model-level configuration or raise error
return None

assert isinstance(self.model.config, NodeConfig)
batch_size = self.model.config.batch_size

lookback = self.model.config.lookback
start = MicrobatchBuilder.offset_timestamp(checkpoint, batch_size, -1 * lookback)

return start

def build_batches(
self, start: Optional[datetime], end: datetime
) -> List[Tuple[Optional[datetime], datetime]]:
"""
Given a start and end datetime, builds a list of batches where each batch is
the size of the model's batch_size.
"""
if start is None:
return [(start, end)]

batch_size = self.model.config.batch_size
curr_batch_start: datetime = start
curr_batch_end: datetime = MicrobatchBuilder.offset_timestamp(
curr_batch_start, batch_size, 1
)

batches: List[Tuple[Optional[datetime], datetime]] = [(curr_batch_start, curr_batch_end)]
while curr_batch_end <= end:
curr_batch_start = curr_batch_end
curr_batch_end = MicrobatchBuilder.offset_timestamp(curr_batch_start, batch_size, 1)
batches.append((curr_batch_start, curr_batch_end))

# use exact end value as stop
batches[-1] = (batches[-1][0], end)

return batches

@staticmethod
def offset_timestamp(timestamp: datetime, batch_size: BatchSize, offset: int) -> datetime:
"""Truncates the passed in timestamp based on the batch_size and then applies the offset by the batch_size.
Note: It's important to understand that the offset applies to the truncated timestamp, not
the origin timestamp. Thus being offset by a day isn't relative to the any given hour that day,
but relative to the start of the day. So if the timestamp is the very end of a day, 2024-09-17 23:59:59,
you have a batch size of a day, and an offset of +1, then the returned value ends up being only one
second later, 2024-09-18 00:00:00.
2024-09-17 16:06:00 + Batchsize.hour -1 -> 2024-09-17 15:00:00
2024-09-17 16:06:00 + Batchsize.hour +1 -> 2024-09-17 17:00:00
2024-09-17 16:06:00 + Batchsize.day -1 -> 2024-09-16 00:00:00
2024-09-17 16:06:00 + Batchsize.day +1 -> 2024-09-18 00:00:00
2024-09-17 16:06:00 + Batchsize.month -1 -> 2024-08-01 00:00:00
2024-09-17 16:06:00 + Batchsize.month +1 -> 2024-10-01 00:00:00
2024-09-17 16:06:00 + Batchsize.year -1 -> 2023-01-01 00:00:00
2024-09-17 16:06:00 + Batchsize.year +1 -> 2025-01-01 00:00:00
"""
truncated = MicrobatchBuilder.truncate_timestamp(timestamp, batch_size)

offset_timestamp: datetime
if batch_size == BatchSize.hour:
offset_timestamp = truncated + timedelta(hours=offset)
elif batch_size == BatchSize.day:
offset_timestamp = truncated + timedelta(days=offset)
elif batch_size == BatchSize.month:
offset_timestamp = truncated
for _ in range(abs(offset)):
if offset < 0:
offset_timestamp = offset_timestamp - timedelta(days=1)
else:
offset_timestamp = offset_timestamp + timedelta(days=31)
offset_timestamp = MicrobatchBuilder.truncate_timestamp(
offset_timestamp, batch_size
)
elif batch_size == BatchSize.year:
offset_timestamp = truncated.replace(year=truncated.year + offset)

return offset_timestamp

@staticmethod
def truncate_timestamp(timestamp: datetime, batch_size: BatchSize):
"""Truncates the passed in timestamp based on the batch_size.
2024-09-17 16:06:00 + Batchsize.hour -> 2024-09-17 16:00:00
2024-09-17 16:06:00 + Batchsize.day -> 2024-09-17 00:00:00
2024-09-17 16:06:00 + Batchsize.month -> 2024-09-01 00:00:00
2024-09-17 16:06:00 + Batchsize.year -> 2024-01-01 00:00:00
"""
if batch_size == BatchSize.hour:
truncated = datetime(
timestamp.year,
timestamp.month,
timestamp.day,
timestamp.hour,
0,
0,
0,
pytz.utc,
)
elif batch_size == BatchSize.day:
truncated = datetime(
timestamp.year, timestamp.month, timestamp.day, 0, 0, 0, 0, pytz.utc
)
elif batch_size == BatchSize.month:
truncated = datetime(timestamp.year, timestamp.month, 1, 0, 0, 0, 0, pytz.utc)
elif batch_size == BatchSize.year:
truncated = datetime(timestamp.year, 1, 1, 0, 0, 0, 0, pytz.utc)

return truncated
5 changes: 5 additions & 0 deletions core/dbt/node_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
NodeType.Snapshot,
]

TEST_NODE_TYPES: List["NodeType"] = [
NodeType.Test,
NodeType.Unit,
]

VERSIONED_NODE_TYPES: List["NodeType"] = [
NodeType.Model,
]
7 changes: 0 additions & 7 deletions core/dbt/task/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from dbt.graph import Graph, GraphQueue, ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.task.base import BaseRunner, resource_types_from_args
from dbt.task.test import TestSelector
from dbt_common.events.functions import fire_event

from .run import ModelRunner as run_model_runner
Expand Down Expand Up @@ -197,12 +196,6 @@ def get_node_selector(self, no_unit_tests=False) -> ResourceTypeSelector:

resource_types = self.resource_types(no_unit_tests)

if resource_types == [NodeType.Test]:
return TestSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
)
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
Expand Down
253 changes: 214 additions & 39 deletions core/dbt/task/docs/index.html

Large diffs are not rendered by default.

Loading

0 comments on commit 3406e60

Please sign in to comment.