Skip to content

Commit

Permalink
raise MicrobatchModelNoEventTimeInputs warning when no microbatch inp…
Browse files Browse the repository at this point in the history
…ut has event_time config
  • Loading branch information
MichelleArk committed Oct 28, 2024
1 parent 316ecfc commit 2845b0a
Show file tree
Hide file tree
Showing 5 changed files with 478 additions and 439 deletions.
11 changes: 11 additions & 0 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,17 @@ message FreshnessConfigProblemMsg {
}


// I074
message MicrobatchModelNoEventTimeInputs {
string model_name = 1;
}

message MicrobatchModelNoEventTimeInputsMsg {
CoreEventInfo info = 1;
MicrobatchModelNoEventTimeInputs data = 2;
}


// M - Deps generation


Expand Down
866 changes: 435 additions & 431 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,19 @@ def message(self) -> str:
return self.msg


class MicrobatchModelNoEventTimeInputs(WarnLevel):
def code(self) -> str:
return "I074"

def message(self) -> str:
msg = (
f"The microbatch model '{self.model_name}' has no 'ref' or 'source' input with an 'event_time' configuration. "
"This can result in unexpected duplicate records in the resulting microbatch model."
)

return warning_tag(msg)


# =======================================================
# M - Deps generation
# =======================================================
Expand Down
15 changes: 11 additions & 4 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
DeprecatedModel,
DeprecatedReference,
InvalidDisabledTargetInTestNode,
MicrobatchModelNoEventTimeInputs,
NodeNotFoundOrDisabled,
ParsedFileLoadFailed,
ParsePerfInfoPath,
Expand Down Expand Up @@ -1442,13 +1443,19 @@ def check_valid_microbatch_config(self):
)

# Validate upstream node event_time (if configured)
has_input_with_event_time_config = False
for input_unique_id in node.depends_on.nodes:
input_node = self.manifest.expect(unique_id=input_unique_id)
input_event_time = input_node.config.event_time
if input_event_time and not isinstance(input_event_time, str):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}."
)
if input_event_time:
if not isinstance(input_event_time, str):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}."
)
has_input_with_event_time_config = True

if not has_input_with_event_time_config:
fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name))

def write_perf_info(self, target_path: str):
path = os.path.join(target_path, PERF_INFO_FILE_NAME)
Expand Down
12 changes: 8 additions & 4 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest
import pytz

from dbt.events.types import LogModelResult
from dbt.events.types import LogModelResult, MicrobatchModelNoEventTimeInputs
from dbt.tests.util import (
get_artifact,
patch_microbatch_end_time,
Expand Down Expand Up @@ -47,7 +47,6 @@
select * from {{ ref('microbatch_model') }}
"""


microbatch_model_ref_render_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
select * from {{ ref('input_model').render() }}
Expand Down Expand Up @@ -290,9 +289,11 @@ def test_run_with_event_time(self, project):
run_dbt(["seed"])

# initial run -- backfills all data
catcher = EventCatcher(event_to_catch=MicrobatchModelNoEventTimeInputs)
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
run_dbt(["run"], callbacks=[catcher.catch])
self.assert_row_count(project, "microbatch_model", 3)
assert len(catcher.caught_events) == 0

# our partition grain is "day" so running the same day without new data should produce the same results
with patch_microbatch_end_time("2020-01-03 14:57:00"):
Expand Down Expand Up @@ -334,10 +335,13 @@ def models(self):

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
catcher = EventCatcher(event_to_catch=MicrobatchModelNoEventTimeInputs)

# initial run -- backfills all data
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
run_dbt(["run"], callbacks=[catcher.catch])
self.assert_row_count(project, "microbatch_model", 3)
assert len(catcher.caught_events) == 1

# our partition grain is "day" so running the same day without new data should produce the same results
with patch_microbatch_end_time("2020-01-03 14:57:00"):
Expand Down

0 comments on commit 2845b0a

Please sign in to comment.