Skip to content

Commit

Permalink
Merge branch 'main' into qmalcolm--10874-make-event-time-start-end-mu…
Browse files Browse the repository at this point in the history
…tually-required
  • Loading branch information
MichelleArk committed Oct 29, 2024
2 parents 917d8d8 + 6b5db17 commit daeabd9
Show file tree
Hide file tree
Showing 43 changed files with 1,621 additions and 531 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241001-134051.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Enable use of multi-column unique key in snapshots
time: 2024-10-01T13:40:51.297529-04:00
custom:
Author: gshank
Issue: "9992"
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241028-173419.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Emit warning when microbatch model has no input with `event_time` config
time: 2024-10-28T17:34:19.195209-04:00
custom:
Author: michelleark
Issue: "10926"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241017-134857.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Change `lookback` default from `0` to `1` to ensure better data completeness
time: 2024-10-17T13:48:57.805205-07:00
custom:
Author: QMalcolm
Issue: "10867"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241022-222927.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Implement partial parsing for all-yaml snapshots
time: 2024-10-22T22:29:27.396378-04:00
custom:
Author: gshank
Issue: "10903"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241023-152054.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Restore source quoting behaviour when quoting config provided in dbt_project.yml
time: 2024-10-23T15:20:54.766893-04:00
custom:
Author: michelleark
Issue: "10892"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241024-104938.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix bug when referencing deprecated models
time: 2024-10-24T10:49:38.352328-06:00
custom:
Author: dbeatty10 danlsn
Issue: "10915"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241028-132751.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: 'Fix ''model'' jinja context variable type to dict '
time: 2024-10-28T13:27:51.604093-04:00
custom:
Author: michelleark
Issue: "10927"
2 changes: 1 addition & 1 deletion core/dbt/artifacts/resources/v1/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class NodeConfig(NodeAndTestConfig):
materialized: str = "view"
incremental_strategy: Optional[str] = None
batch_size: Any = None
lookback: Any = 0
lookback: Any = 1
begin: Any = None
persist_docs: Dict[str, Any] = field(default_factory=dict)
post_hook: List[Hook] = field(
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/artifacts/resources/v1/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class SnapshotMetaColumnNames(dbtClassMixin):
class SnapshotConfig(NodeConfig):
materialized: str = "snapshot"
strategy: Optional[str] = None
unique_key: Optional[str] = None
unique_key: Optional[Union[str, List[str]]] = None
target_schema: Optional[str] = None
target_database: Optional[str] = None
updated_at: Optional[str] = None
Expand Down
8 changes: 3 additions & 5 deletions core/dbt/artifacts/schemas/results.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Union
from typing import Any, Callable, Dict, List, Optional, Sequence, Union

from dbt.contracts.graph.nodes import ResultNode
from dbt_common.dataclass_schema import StrEnum, dbtClassMixin
Expand All @@ -16,7 +16,7 @@ class TimingInfo(dbtClassMixin):
Do not call directly, use `collect_timing_info` instead.
"""

name: Literal["compile", "execute", "other"]
name: str
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None

Expand All @@ -37,9 +37,7 @@ def to_msg_dict(self):

# This is a context manager
class collect_timing_info:
def __init__(
self, name: Literal["compile", "execute", "other"], callback: Callable[[TimingInfo], None]
) -> None:
def __init__(self, name: str, callback: Callable[[TimingInfo], None]) -> None:
self.timing_info = TimingInfo(name=name)
self.callback = callback

Expand Down
9 changes: 8 additions & 1 deletion core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,8 +684,15 @@ def resolve(self, source_name: str, table_name: str):
target_kind="source",
disabled=(isinstance(target_source, Disabled)),
)

# Source quoting does _not_ respect global configs in dbt_project.yml, as documented here:
# https://docs.getdbt.com/reference/project-configs/quoting
# Use an object with an empty quoting field to bypass any settings in self.
class SourceQuotingBaseConfig:
quoting: Dict[str, Any] = {}

return self.Relation.create_from(
self.config,
SourceQuotingBaseConfig(),
target_source,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter(target_source),
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ class SchemaSourceFile(BaseSourceFile):
sources: List[str] = field(default_factory=list)
exposures: List[str] = field(default_factory=list)
metrics: List[str] = field(default_factory=list)
snapshots: List[str] = field(default_factory=list)
# The following field will no longer be used. Leaving
# here to avoid breaking existing projects. To be removed
# later if possible.
Expand Down
9 changes: 6 additions & 3 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
SeedNode,
SemanticModel,
SingularTestNode,
SnapshotNode,
SourceDefinition,
UnitTestDefinition,
UnitTestFileFixture,
Expand Down Expand Up @@ -1600,12 +1601,14 @@ def add_node(self, source_file: AnySourceFile, node: ManifestNode, test_from=Non
if isinstance(node, GenericTestNode):
assert test_from
source_file.add_test(node.unique_id, test_from)
if isinstance(node, Metric):
elif isinstance(node, Metric):
source_file.metrics.append(node.unique_id)
if isinstance(node, Exposure):
elif isinstance(node, Exposure):
source_file.exposures.append(node.unique_id)
if isinstance(node, Group):
elif isinstance(node, Group):
source_file.groups.append(node.unique_id)
elif isinstance(node, SnapshotNode):
source_file.snapshots.append(node.unique_id)
elif isinstance(source_file, FixtureSourceFile):
pass
else:
Expand Down
11 changes: 11 additions & 0 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,17 @@ message FreshnessConfigProblemMsg {
}


// I074
message MicrobatchModelNoEventTimeInputs {
string model_name = 1;
}

message MicrobatchModelNoEventTimeInputsMsg {
CoreEventInfo info = 1;
MicrobatchModelNoEventTimeInputs data = 2;
}


// M - Deps generation


Expand Down
866 changes: 435 additions & 431 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,19 @@ def message(self) -> str:
return self.msg


class MicrobatchModelNoEventTimeInputs(WarnLevel):
def code(self) -> str:
return "I074"

def message(self) -> str:
msg = (
f"The microbatch model '{self.model_name}' has no 'ref' or 'source' input with an 'event_time' configuration. "
"\nThis means no filtering can be applied and can result in unexpected duplicate records in the resulting microbatch model."
)

return warning_tag(msg)


# =======================================================
# M - Deps generation
# =======================================================
Expand Down
22 changes: 21 additions & 1 deletion core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime, timedelta
from typing import List, Optional
from typing import Any, Dict, List, Optional

import pytz

Expand Down Expand Up @@ -99,6 +99,26 @@ def build_batches(self, start: datetime, end: datetime) -> List[BatchType]:

return batches

def build_batch_context(self, incremental_batch: bool) -> Dict[str, Any]:
"""
Create context with entries that reflect microbatch model + incremental execution state
Assumes self.model has been (re)-compiled with necessary batch filters applied.
"""
batch_context: Dict[str, Any] = {}

# Microbatch model properties
batch_context["model"] = self.model.to_dict()
batch_context["sql"] = self.model.compiled_code
batch_context["compiled_code"] = self.model.compiled_code

# Add incremental context variables for batches running incrementally
if incremental_batch:
batch_context["is_incremental"] = lambda: True
batch_context["should_full_refresh"] = lambda: False

return batch_context

@staticmethod
def offset_timestamp(timestamp: datetime, batch_size: BatchSize, offset: int) -> datetime:
"""Truncates the passed in timestamp based on the batch_size and then applies the offset by the batch_size.
Expand Down
17 changes: 12 additions & 5 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
DeprecatedModel,
DeprecatedReference,
InvalidDisabledTargetInTestNode,
MicrobatchModelNoEventTimeInputs,
NodeNotFoundOrDisabled,
ParsedFileLoadFailed,
ParsePerfInfoPath,
Expand Down Expand Up @@ -590,7 +591,7 @@ def check_for_model_deprecations(self):
# Get the child_nodes and check for deprecations.
child_nodes = self.manifest.child_map[node.unique_id]
for child_unique_id in child_nodes:
child_node = self.manifest.nodes[child_unique_id]
child_node = self.manifest.nodes.get(child_unique_id)
if not isinstance(child_node, ModelNode):
continue
if node.is_past_deprecation_date:
Expand Down Expand Up @@ -1442,13 +1443,19 @@ def check_valid_microbatch_config(self):
)

# Validate upstream node event_time (if configured)
has_input_with_event_time_config = False
for input_unique_id in node.depends_on.nodes:
input_node = self.manifest.expect(unique_id=input_unique_id)
input_event_time = input_node.config.event_time
if input_event_time and not isinstance(input_event_time, str):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}."
)
if input_event_time:
if not isinstance(input_event_time, str):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}."
)
has_input_with_event_time_config = True

if not has_input_with_event_time_config:
fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name))

def write_perf_info(self, target_path: str):
path = os.path.join(target_path, PERF_INFO_FILE_NAME)
Expand Down
21 changes: 21 additions & 0 deletions core/dbt/parser/partial.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,10 +658,14 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict
key_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict)
if key_diff["changed"]:
for elem in key_diff["changed"]:
if dict_key == "snapshots" and "relation" in elem:
self.delete_yaml_snapshot(schema_file, elem)
self.delete_schema_mssa_links(schema_file, dict_key, elem)
self.merge_patch(schema_file, dict_key, elem, True)
if key_diff["deleted"]:
for elem in key_diff["deleted"]:
if dict_key == "snapshots" and "relation" in elem:
self.delete_yaml_snapshot(schema_file, elem)
self.delete_schema_mssa_links(schema_file, dict_key, elem)
if key_diff["added"]:
for elem in key_diff["added"]:
Expand All @@ -673,6 +677,8 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict
continue
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
if dict_key == "snapshots" and "relation" in elem:
self.delete_yaml_snapshot(schema_file, elem)
self.delete_schema_mssa_links(schema_file, dict_key, elem)
self.merge_patch(schema_file, dict_key, elem, True)

Expand Down Expand Up @@ -828,6 +834,8 @@ def delete_schema_mssa_links(self, schema_file, dict_key, elem):
# remove elem node and remove unique_id from node_patches
for elem_unique_id in elem_unique_ids:
# might have been already removed
# For all-yaml snapshots, we don't do this, since the node
# should have already been removed.
if (
elem_unique_id in self.saved_manifest.nodes
or elem_unique_id in self.saved_manifest.disabled
Expand Down Expand Up @@ -868,6 +876,19 @@ def remove_tests(self, schema_file, dict_key, name):
self.saved_manifest.nodes.pop(test_unique_id)
schema_file.remove_tests(dict_key, name)

def delete_yaml_snapshot(self, schema_file, snapshot_dict):
snapshot_name = snapshot_dict["name"]
snapshots = schema_file.snapshots.copy()
for unique_id in snapshots:
if unique_id in self.saved_manifest.nodes:
snapshot = self.saved_manifest.nodes[unique_id]
if snapshot.name == snapshot_name:
self.saved_manifest.nodes.pop(unique_id)
schema_file.snapshots.remove(unique_id)
elif unique_id in self.saved_manifest.disabled:
self.delete_disabled(unique_id, schema_file.file_id)
schema_file.snapshots.remove(unique_id)

def delete_schema_source(self, schema_file, source_dict):
# both patches, tests, and source nodes
source_name = source_dict["name"]
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,9 @@ def _add_yaml_snapshot_nodes_to_manifest(
snapshot_node.raw_code = "select * from {{ " + snapshot["relation"] + " }}"

# Add our new node to the manifest, and note that ref lookup collections
# will need to be rebuilt.
self.manifest.add_node_nofile(snapshot_node)
# will need to be rebuilt. This adds the node unique_id to the "snapshots"
# list in the SchemaSourceFile.
self.manifest.add_node(block.file, snapshot_node)
rebuild_refs = True

if rebuild_refs:
Expand Down
Loading

0 comments on commit daeabd9

Please sign in to comment.