diff --git a/.changes/unreleased/Features-20241001-134051.yaml b/.changes/unreleased/Features-20241001-134051.yaml new file mode 100644 index 00000000000..60ada51ece3 --- /dev/null +++ b/.changes/unreleased/Features-20241001-134051.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Enable use of multi-column unique key in snapshots +time: 2024-10-01T13:40:51.297529-04:00 +custom: + Author: gshank + Issue: "9992" diff --git a/.changes/unreleased/Fixes-20241018-135810.yaml b/.changes/unreleased/Fixes-20241018-135810.yaml new file mode 100644 index 00000000000..c205e15bb09 --- /dev/null +++ b/.changes/unreleased/Fixes-20241018-135810.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Exclude hook result from results in on-run-end context +time: 2024-10-18T13:58:10.396884-07:00 +custom: + Author: ChenyuLInx + Issue: "7387" diff --git a/.changes/unreleased/Fixes-20241022-222927.yaml b/.changes/unreleased/Fixes-20241022-222927.yaml new file mode 100644 index 00000000000..cd294862ba4 --- /dev/null +++ b/.changes/unreleased/Fixes-20241022-222927.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Implement partial parsing for all-yaml snapshots +time: 2024-10-22T22:29:27.396378-04:00 +custom: + Author: gshank + Issue: "10903" diff --git a/core/dbt/artifacts/resources/v1/snapshot.py b/core/dbt/artifacts/resources/v1/snapshot.py index 062b6a62814..1a7b9344ca0 100644 --- a/core/dbt/artifacts/resources/v1/snapshot.py +++ b/core/dbt/artifacts/resources/v1/snapshot.py @@ -19,7 +19,7 @@ class SnapshotMetaColumnNames(dbtClassMixin): class SnapshotConfig(NodeConfig): materialized: str = "snapshot" strategy: Optional[str] = None - unique_key: Optional[str] = None + unique_key: Optional[Union[str, List[str]]] = None target_schema: Optional[str] = None target_database: Optional[str] = None updated_at: Optional[str] = None diff --git a/core/dbt/artifacts/schemas/results.py b/core/dbt/artifacts/schemas/results.py index 00746c87885..dd455f309b8 100644 --- a/core/dbt/artifacts/schemas/results.py +++ b/core/dbt/artifacts/schemas/results.py @@ -10,6 +10,12 @@ @dataclass class TimingInfo(dbtClassMixin): + """ + Represents a step in the execution of a node. + `name` should be one of: compile, execute, or other + Do not call directly, use `collect_timing_info` instead. + """ + name: str started_at: Optional[datetime] = None completed_at: Optional[datetime] = None @@ -21,7 +27,7 @@ def end(self): self.completed_at = datetime.utcnow() def to_msg_dict(self): - msg_dict = {"name": self.name} + msg_dict = {"name": str(self.name)} if self.started_at: msg_dict["started_at"] = datetime_to_json_string(self.started_at) if self.completed_at: diff --git a/core/dbt/contracts/files.py b/core/dbt/contracts/files.py index d5c1dba5366..15e951e026c 100644 --- a/core/dbt/contracts/files.py +++ b/core/dbt/contracts/files.py @@ -192,6 +192,7 @@ class SchemaSourceFile(BaseSourceFile): sources: List[str] = field(default_factory=list) exposures: List[str] = field(default_factory=list) metrics: List[str] = field(default_factory=list) + snapshots: List[str] = field(default_factory=list) # The following field will no longer be used. Leaving # here to avoid breaking existing projects. To be removed # later if possible. diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index 4ce887591d9..d387616a8ea 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -59,6 +59,7 @@ SeedNode, SemanticModel, SingularTestNode, + SnapshotNode, SourceDefinition, UnitTestDefinition, UnitTestFileFixture, @@ -1600,12 +1601,14 @@ def add_node(self, source_file: AnySourceFile, node: ManifestNode, test_from=Non if isinstance(node, GenericTestNode): assert test_from source_file.add_test(node.unique_id, test_from) - if isinstance(node, Metric): + elif isinstance(node, Metric): source_file.metrics.append(node.unique_id) - if isinstance(node, Exposure): + elif isinstance(node, Exposure): source_file.exposures.append(node.unique_id) - if isinstance(node, Group): + elif isinstance(node, Group): source_file.groups.append(node.unique_id) + elif isinstance(node, SnapshotNode): + source_file.snapshots.append(node.unique_id) elif isinstance(source_file, FixtureSourceFile): pass else: diff --git a/core/dbt/parser/partial.py b/core/dbt/parser/partial.py index 774edf8ce6d..d4e20a617e1 100644 --- a/core/dbt/parser/partial.py +++ b/core/dbt/parser/partial.py @@ -658,10 +658,14 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict key_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict) if key_diff["changed"]: for elem in key_diff["changed"]: + if dict_key == "snapshots" and "relation" in elem: + self.delete_yaml_snapshot(schema_file, elem) self.delete_schema_mssa_links(schema_file, dict_key, elem) self.merge_patch(schema_file, dict_key, elem, True) if key_diff["deleted"]: for elem in key_diff["deleted"]: + if dict_key == "snapshots" and "relation" in elem: + self.delete_yaml_snapshot(schema_file, elem) self.delete_schema_mssa_links(schema_file, dict_key, elem) if key_diff["added"]: for elem in key_diff["added"]: @@ -673,6 +677,8 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict continue elem = self.get_schema_element(new_yaml_dict[dict_key], name) if elem: + if dict_key == "snapshots" and "relation" in elem: + self.delete_yaml_snapshot(schema_file, elem) self.delete_schema_mssa_links(schema_file, dict_key, elem) self.merge_patch(schema_file, dict_key, elem, True) @@ -828,6 +834,8 @@ def delete_schema_mssa_links(self, schema_file, dict_key, elem): # remove elem node and remove unique_id from node_patches for elem_unique_id in elem_unique_ids: # might have been already removed + # For all-yaml snapshots, we don't do this, since the node + # should have already been removed. if ( elem_unique_id in self.saved_manifest.nodes or elem_unique_id in self.saved_manifest.disabled @@ -868,6 +876,19 @@ def remove_tests(self, schema_file, dict_key, name): self.saved_manifest.nodes.pop(test_unique_id) schema_file.remove_tests(dict_key, name) + def delete_yaml_snapshot(self, schema_file, snapshot_dict): + snapshot_name = snapshot_dict["name"] + snapshots = schema_file.snapshots.copy() + for unique_id in snapshots: + if unique_id in self.saved_manifest.nodes: + snapshot = self.saved_manifest.nodes[unique_id] + if snapshot.name == snapshot_name: + self.saved_manifest.nodes.pop(unique_id) + schema_file.snapshots.remove(unique_id) + elif unique_id in self.saved_manifest.disabled: + self.delete_disabled(unique_id, schema_file.file_id) + schema_file.snapshots.remove(unique_id) + def delete_schema_source(self, schema_file, source_dict): # both patches, tests, and source nodes source_name = source_dict["name"] diff --git a/core/dbt/parser/schemas.py b/core/dbt/parser/schemas.py index 077d7083ed5..e0c94b8b444 100644 --- a/core/dbt/parser/schemas.py +++ b/core/dbt/parser/schemas.py @@ -309,8 +309,9 @@ def _add_yaml_snapshot_nodes_to_manifest( snapshot_node.raw_code = "select * from {{ " + snapshot["relation"] + " }}" # Add our new node to the manifest, and note that ref lookup collections - # will need to be rebuilt. - self.manifest.add_node_nofile(snapshot_node) + # will need to be rebuilt. This adds the node unique_id to the "snapshots" + # list in the SchemaSourceFile. + self.manifest.add_node(block.file, snapshot_node) rebuild_refs = True if rebuild_refs: diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 99913a551c5..7a321e69d30 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -17,6 +17,7 @@ RunningStatus, RunStatus, TimingInfo, + collect_timing_info, ) from dbt.artifacts.schemas.run import RunResult from dbt.cli.flags import Flags @@ -633,7 +634,6 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: def safe_run_hooks( self, adapter: BaseAdapter, hook_type: RunHookType, extra_context: Dict[str, Any] ) -> RunStatus: - started_at = datetime.utcnow() ordered_hooks = self.get_hooks_by_type(hook_type) if hook_type == RunHookType.End and ordered_hooks: @@ -653,14 +653,20 @@ def safe_run_hooks( hook.index = idx hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}" execution_time = 0.0 - timing = [] + timing: List[TimingInfo] = [] failures = 1 if not failed: + with collect_timing_info("compile", timing.append): + sql = self.get_hook_sql( + adapter, hook, hook.index, num_hooks, extra_context + ) + + started_at = timing[0].started_at or datetime.utcnow() hook.update_event_status( started_at=started_at.isoformat(), node_status=RunningStatus.Started ) - sql = self.get_hook_sql(adapter, hook, hook.index, num_hooks, extra_context) + fire_event( LogHookStartLine( statement=hook_name, @@ -670,11 +676,12 @@ def safe_run_hooks( ) ) - status, message = get_execution_status(sql, adapter) - finished_at = datetime.utcnow() + with collect_timing_info("execute", timing.append): + status, message = get_execution_status(sql, adapter) + + finished_at = timing[1].completed_at or datetime.utcnow() hook.update_event_status(finished_at=finished_at.isoformat()) execution_time = (finished_at - started_at).total_seconds() - timing = [TimingInfo(hook_name, started_at, finished_at)] failures = 0 if status == RunStatus.Success else 1 if status == RunStatus.Success: @@ -767,7 +774,9 @@ def after_run(self, adapter, results) -> None: extras = { "schemas": list({s for _, s in database_schema_set}), - "results": results, + "results": [ + r for r in results if r.thread_id != "main" or r.status == RunStatus.Error + ], # exclude that didn't fail to preserve backwards compatibility "database_schemas": list(database_schema_set), } with adapter.connection_named("master"): diff --git a/core/dbt/task/run_operation.py b/core/dbt/task/run_operation.py index 793ba81fb01..ebe8b14352e 100644 --- a/core/dbt/task/run_operation.py +++ b/core/dbt/task/run_operation.py @@ -2,11 +2,11 @@ import threading import traceback from datetime import datetime -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, List import dbt_common.exceptions from dbt.adapters.factory import get_adapter -from dbt.artifacts.schemas.results import RunStatus, TimingInfo +from dbt.artifacts.schemas.results import RunStatus, TimingInfo, collect_timing_info from dbt.artifacts.schemas.run import RunResult, RunResultsArtifact from dbt.contracts.files import FileHash from dbt.contracts.graph.nodes import HookNode @@ -51,25 +51,29 @@ def _run_unsafe(self, package_name, macro_name) -> "agate.Table": return res def run(self) -> RunResultsArtifact: - start = datetime.utcnow() - self.compile_manifest() + timing: List[TimingInfo] = [] - success = True + with collect_timing_info("compile", timing.append): + self.compile_manifest() + + start = timing[0].started_at + success = True package_name, macro_name = self._get_macro_parts() - try: - self._run_unsafe(package_name, macro_name) - except dbt_common.exceptions.DbtBaseException as exc: - fire_event(RunningOperationCaughtError(exc=str(exc))) - fire_event(LogDebugStackTrace(exc_info=traceback.format_exc())) - success = False - except Exception as exc: - fire_event(RunningOperationUncaughtError(exc=str(exc))) - fire_event(LogDebugStackTrace(exc_info=traceback.format_exc())) - success = False + with collect_timing_info("execute", timing.append): + try: + self._run_unsafe(package_name, macro_name) + except dbt_common.exceptions.DbtBaseException as exc: + fire_event(RunningOperationCaughtError(exc=str(exc))) + fire_event(LogDebugStackTrace(exc_info=traceback.format_exc())) + success = False + except Exception as exc: + fire_event(RunningOperationUncaughtError(exc=str(exc))) + fire_event(LogDebugStackTrace(exc_info=traceback.format_exc())) + success = False - end = datetime.utcnow() + end = timing[1].completed_at macro = ( self.manifest.find_macro_by_name(macro_name, self.config.project_name, package_name) @@ -85,10 +89,12 @@ def run(self) -> RunResultsArtifact: f"dbt could not find a macro with the name '{macro_name}' in any package" ) + execution_time = (end - start).total_seconds() if start and end else 0.0 + run_result = RunResult( adapter_response={}, status=RunStatus.Success if success else RunStatus.Error, - execution_time=(end - start).total_seconds(), + execution_time=execution_time, failures=0 if success else 1, message=None, node=HookNode( @@ -105,13 +111,13 @@ def run(self) -> RunResultsArtifact: original_file_path="", ), thread_id=threading.current_thread().name, - timing=[TimingInfo(name=macro_name, started_at=start, completed_at=end)], + timing=timing, batch_results=None, ) results = RunResultsArtifact.from_execution_results( - generated_at=end, - elapsed_time=(end - start).total_seconds(), + generated_at=end or datetime.utcnow(), + elapsed_time=execution_time, args={ k: v for k, v in self.args.__dict__.items() diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index a37308f81ea..7ad9d87e10d 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -512,7 +512,6 @@ def execute_with_hooks(self, selected_uids: AbstractSet[str]): self.started_at = time.time() try: before_run_status = self.before_run(adapter, selected_uids) - if before_run_status == RunStatus.Success or ( not get_flags().skip_nodes_if_on_run_start_fails ): diff --git a/core/setup.py b/core/setup.py index b7a8dabd14e..b787ce8a923 100644 --- a/core/setup.py +++ b/core/setup.py @@ -71,7 +71,7 @@ "dbt-extractor>=0.5.0,<=0.6", "dbt-semantic-interfaces>=0.7.3,<0.8", # Minor versions for these are expected to be backwards-compatible - "dbt-common>=1.9.0,<2.0", + "dbt-common>=1.11.0,<2.0", "dbt-adapters>=1.7.0,<2.0", # ---- # Expect compatibility with all new versions of these packages, so lower bounds only. diff --git a/schemas/dbt/manifest/v12.json b/schemas/dbt/manifest/v12.json index fb85d40c9c0..0a8322611e4 100644 --- a/schemas/dbt/manifest/v12.json +++ b/schemas/dbt/manifest/v12.json @@ -6540,6 +6540,12 @@ { "type": "string" }, + { + "type": "array", + "items": { + "type": "string" + } + }, { "type": "null" } @@ -16425,6 +16431,12 @@ { "type": "string" }, + { + "type": "array", + "items": { + "type": "string" + } + }, { "type": "null" } diff --git a/tests/functional/adapter/hooks/test_on_run_hooks.py b/tests/functional/adapter/hooks/test_on_run_hooks.py index 42edbdae970..b85784be3cf 100644 --- a/tests/functional/adapter/hooks/test_on_run_hooks.py +++ b/tests/functional/adapter/hooks/test_on_run_hooks.py @@ -55,6 +55,14 @@ def test_results(self, project, log_counts, my_model_run_status): for result in results if isinstance(result.node, HookNode) ] == [(id, str(status)) for id, status in expected_results if id.startswith("operation")] + + for result in results: + if result.status == RunStatus.Skipped: + continue + + timing_keys = [timing.name for timing in result.timing] + assert timing_keys == ["compile", "execute"] + assert log_counts in log_output assert "4 project hooks, 1 view model" in log_output @@ -160,3 +168,77 @@ def test_results(self, project): run_results = get_artifact(project.project_root, "target", "run_results.json") assert run_results["results"] == [] + + +class Test__HookContext__HookSuccess: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "on-run-start": [ + "select 1 as id", # success + "select 1 as id", # success + ], + "on-run-end": [ + '{{ log("Num Results in context: " ~ results|length)}}' + "{{ output_thread_ids(results) }}", + ], + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "log.sql": """ +{% macro output_thread_ids(results) %} + {% for result in results %} + {{ log("Thread ID: " ~ result.thread_id) }} + {% endfor %} +{% endmacro %} +""" + } + + @pytest.fixture(scope="class") + def models(self): + return {"my_model.sql": "select 1"} + + def test_results_in_context_success(self, project): + results, log_output = run_dbt_and_capture(["--debug", "run"]) + assert "Thread ID: " in log_output + assert "Thread ID: main" not in log_output + assert results[0].thread_id == "main" # hook still exists in run results + assert "Num Results in context: 1" in log_output # only model given hook was successful + + +class Test__HookContext__HookFail: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "on-run-start": [ + "select a as id", # fail + ], + "on-run-end": [ + '{{ log("Num Results in context: " ~ results|length)}}' + "{{ output_thread_ids(results) }}", + ], + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "log.sql": """ +{% macro output_thread_ids(results) %} + {% for result in results %} + {{ log("Thread ID: " ~ result.thread_id) }} + {% endfor %} +{% endmacro %} +""" + } + + @pytest.fixture(scope="class") + def models(self): + return {"my_model.sql": "select 1"} + + def test_results_in_context_hook_fail(self, project): + results, log_output = run_dbt_and_capture(["--debug", "run"], expect_pass=False) + assert "Thread ID: main" in log_output + assert results[0].thread_id == "main" + assert "Num Results in context: 2" in log_output # failed hook and model diff --git a/tests/functional/adapter/simple_snapshot/fixtures.py b/tests/functional/adapter/simple_snapshot/fixtures.py new file mode 100644 index 00000000000..cec28a7d64d --- /dev/null +++ b/tests/functional/adapter/simple_snapshot/fixtures.py @@ -0,0 +1,430 @@ +create_seed_sql = """ +create table {schema}.seed ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + updated_at TIMESTAMP +); +""" + +create_snapshot_expected_sql = """ +create table {schema}.snapshot_expected ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + + -- snapshotting fields + updated_at TIMESTAMP, + test_valid_from TIMESTAMP, + test_valid_to TIMESTAMP, + test_scd_id TEXT, + test_updated_at TIMESTAMP +); +""" + + +seed_insert_sql = """ +-- seed inserts +-- use the same email for two users to verify that duplicated check_cols values +-- are handled appropriately +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'), +(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'), +(3, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'), +(4, 'Ralph', 'Turner', 'rturner3@hp.com', 'Male', '157.83.76.114', '2016-08-08 00:06:51'), +(5, 'Laura', 'Gonzales', 'lgonzales4@howstuffworks.com', 'Female', '30.54.105.168', '2016-09-01 08:25:38'), +(6, 'Katherine', 'Lopez', 'klopez5@yahoo.co.jp', 'Female', '169.138.46.89', '2016-08-30 18:52:11'), +(7, 'Jeremy', 'Hamilton', 'jhamilton6@mozilla.org', 'Male', '231.189.13.133', '2016-07-17 02:09:46'), +(8, 'Heather', 'Rose', 'hrose7@goodreads.com', 'Female', '87.165.201.65', '2015-12-29 22:03:56'), +(9, 'Gregory', 'Kelly', 'gkelly8@trellian.com', 'Male', '154.209.99.7', '2016-03-24 21:18:16'), +(10, 'Rachel', 'Lopez', 'rlopez9@themeforest.net', 'Female', '237.165.82.71', '2016-08-20 15:44:49'), +(11, 'Donna', 'Welch', 'dwelcha@shutterfly.com', 'Female', '103.33.110.138', '2016-02-27 01:41:48'), +(12, 'Russell', 'Lawrence', 'rlawrenceb@qq.com', 'Male', '189.115.73.4', '2016-06-11 03:07:09'), +(13, 'Michelle', 'Montgomery', 'mmontgomeryc@scientificamerican.com', 'Female', '243.220.95.82', '2016-06-18 16:27:19'), +(14, 'Walter', 'Castillo', 'wcastillod@pagesperso-orange.fr', 'Male', '71.159.238.196', '2016-10-06 01:55:44'), +(15, 'Robin', 'Mills', 'rmillse@vkontakte.ru', 'Female', '172.190.5.50', '2016-10-31 11:41:21'), +(16, 'Raymond', 'Holmes', 'rholmesf@usgs.gov', 'Male', '148.153.166.95', '2016-10-03 08:16:38'), +(17, 'Gary', 'Bishop', 'gbishopg@plala.or.jp', 'Male', '161.108.182.13', '2016-08-29 19:35:20'), +(18, 'Anna', 'Riley', 'arileyh@nasa.gov', 'Female', '253.31.108.22', '2015-12-11 04:34:27'), +(19, 'Sarah', 'Knight', 'sknighti@foxnews.com', 'Female', '222.220.3.177', '2016-09-26 00:49:06'), +(20, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19'); +""" + + +populate_snapshot_expected_sql = """ +-- populate snapshot table +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + null::timestamp as test_valid_to, + updated_at as test_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as test_scd_id +from {schema}.seed; +""" + +populate_snapshot_expected_valid_to_current_sql = """ +-- populate snapshot table +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + date('2099-12-31') as test_valid_to, + updated_at as test_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as test_scd_id +from {schema}.seed; +""" + +snapshot_actual_sql = """ +{% snapshot snapshot_actual %} + + {{ + config( + unique_key='id || ' ~ "'-'" ~ ' || first_name', + ) + }} + + select * from {{target.database}}.{{target.schema}}.seed + +{% endsnapshot %} +""" + +snapshots_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at + snapshot_meta_column_names: + dbt_valid_to: test_valid_to + dbt_valid_from: test_valid_from + dbt_scd_id: test_scd_id + dbt_updated_at: test_updated_at +""" + +snapshots_no_column_names_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at +""" + +ref_snapshot_sql = """ +select * from {{ ref('snapshot_actual') }} +""" + + +invalidate_sql = """ +-- update records 11 - 21. Change email and updated_at field +update {schema}.seed set + updated_at = updated_at + interval '1 hour', + email = case when id = 20 then 'pfoxj@creativecommons.org' else 'new_' || email end +where id >= 10 and id <= 20; + + +-- invalidate records 11 - 21 +update {schema}.snapshot_expected set + test_valid_to = updated_at + interval '1 hour' +where id >= 10 and id <= 20; + +""" + +update_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + null::timestamp as test_valid_to, + updated_at as test_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as test_scd_id +from {schema}.seed +where id >= 10 and id <= 20; +""" + +# valid_to_current fixtures + +snapshots_valid_to_current_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at + dbt_valid_to_current: "date('2099-12-31')" + snapshot_meta_column_names: + dbt_valid_to: test_valid_to + dbt_valid_from: test_valid_from + dbt_scd_id: test_scd_id + dbt_updated_at: test_updated_at +""" + +update_with_current_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + date('2099-12-31') as test_valid_to, + updated_at as test_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as test_scd_id +from {schema}.seed +where id >= 10 and id <= 20; +""" + + +# multi-key snapshot fixtures + +create_multi_key_seed_sql = """ +create table {schema}.seed ( + id1 INTEGER, + id2 INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + updated_at TIMESTAMP +); +""" + + +create_multi_key_snapshot_expected_sql = """ +create table {schema}.snapshot_expected ( + id1 INTEGER, + id2 INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + + -- snapshotting fields + updated_at TIMESTAMP, + test_valid_from TIMESTAMP, + test_valid_to TIMESTAMP, + test_scd_id TEXT, + test_updated_at TIMESTAMP +); +""" + +seed_multi_key_insert_sql = """ +-- seed inserts +-- use the same email for two users to verify that duplicated check_cols values +-- are handled appropriately +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(1, 100, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'), +(2, 200, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'), +(3, 300, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'), +(4, 400, 'Ralph', 'Turner', 'rturner3@hp.com', 'Male', '157.83.76.114', '2016-08-08 00:06:51'), +(5, 500, 'Laura', 'Gonzales', 'lgonzales4@howstuffworks.com', 'Female', '30.54.105.168', '2016-09-01 08:25:38'), +(6, 600, 'Katherine', 'Lopez', 'klopez5@yahoo.co.jp', 'Female', '169.138.46.89', '2016-08-30 18:52:11'), +(7, 700, 'Jeremy', 'Hamilton', 'jhamilton6@mozilla.org', 'Male', '231.189.13.133', '2016-07-17 02:09:46'), +(8, 800, 'Heather', 'Rose', 'hrose7@goodreads.com', 'Female', '87.165.201.65', '2015-12-29 22:03:56'), +(9, 900, 'Gregory', 'Kelly', 'gkelly8@trellian.com', 'Male', '154.209.99.7', '2016-03-24 21:18:16'), +(10, 1000, 'Rachel', 'Lopez', 'rlopez9@themeforest.net', 'Female', '237.165.82.71', '2016-08-20 15:44:49'), +(11, 1100, 'Donna', 'Welch', 'dwelcha@shutterfly.com', 'Female', '103.33.110.138', '2016-02-27 01:41:48'), +(12, 1200, 'Russell', 'Lawrence', 'rlawrenceb@qq.com', 'Male', '189.115.73.4', '2016-06-11 03:07:09'), +(13, 1300, 'Michelle', 'Montgomery', 'mmontgomeryc@scientificamerican.com', 'Female', '243.220.95.82', '2016-06-18 16:27:19'), +(14, 1400, 'Walter', 'Castillo', 'wcastillod@pagesperso-orange.fr', 'Male', '71.159.238.196', '2016-10-06 01:55:44'), +(15, 1500, 'Robin', 'Mills', 'rmillse@vkontakte.ru', 'Female', '172.190.5.50', '2016-10-31 11:41:21'), +(16, 1600, 'Raymond', 'Holmes', 'rholmesf@usgs.gov', 'Male', '148.153.166.95', '2016-10-03 08:16:38'), +(17, 1700, 'Gary', 'Bishop', 'gbishopg@plala.or.jp', 'Male', '161.108.182.13', '2016-08-29 19:35:20'), +(18, 1800, 'Anna', 'Riley', 'arileyh@nasa.gov', 'Female', '253.31.108.22', '2015-12-11 04:34:27'), +(19, 1900, 'Sarah', 'Knight', 'sknighti@foxnews.com', 'Female', '222.220.3.177', '2016-09-26 00:49:06'), +(20, 2000, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19'); +""" + +populate_multi_key_snapshot_expected_sql = """ +-- populate snapshot table +insert into {schema}.snapshot_expected ( + id1, + id2, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id1, + id2, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + null::timestamp as test_valid_to, + updated_at as test_updated_at, + md5(id1::text || '|' || id2::text || '|' || updated_at::text) as test_scd_id +from {schema}.seed; +""" + +model_seed_sql = """ +select * from {{target.database}}.{{target.schema}}.seed +""" + +snapshots_multi_key_yml = """ +snapshots: + - name: snapshot_actual + relation: "ref('seed')" + config: + strategy: timestamp + updated_at: updated_at + unique_key: + - id1 + - id2 + snapshot_meta_column_names: + dbt_valid_to: test_valid_to + dbt_valid_from: test_valid_from + dbt_scd_id: test_scd_id + dbt_updated_at: test_updated_at +""" + +invalidate_multi_key_sql = """ +-- update records 11 - 21. Change email and updated_at field +update {schema}.seed set + updated_at = updated_at + interval '1 hour', + email = case when id1 = 20 then 'pfoxj@creativecommons.org' else 'new_' || email end +where id1 >= 10 and id1 <= 20; + + +-- invalidate records 11 - 21 +update {schema}.snapshot_expected set + test_valid_to = updated_at + interval '1 hour' +where id1 >= 10 and id1 <= 20; + +""" + +update_multi_key_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {schema}.snapshot_expected ( + id1, + id2, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id1, + id2, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + null::timestamp as test_valid_to, + updated_at as test_updated_at, + md5(id1::text || '|' || id2::text || '|' || updated_at::text) as test_scd_id +from {schema}.seed +where id1 >= 10 and id1 <= 20; +""" diff --git a/tests/functional/adapter/simple_snapshot/test_various_configs.py b/tests/functional/adapter/simple_snapshot/test_various_configs.py new file mode 100644 index 00000000000..d288d2934df --- /dev/null +++ b/tests/functional/adapter/simple_snapshot/test_various_configs.py @@ -0,0 +1,277 @@ +import datetime + +import pytest + +from dbt.tests.util import ( + check_relations_equal, + get_manifest, + run_dbt, + run_dbt_and_capture, + run_sql_with_adapter, + update_config_file, +) +from tests.functional.adapter.simple_snapshot.fixtures import ( + create_multi_key_seed_sql, + create_multi_key_snapshot_expected_sql, + create_seed_sql, + create_snapshot_expected_sql, + invalidate_multi_key_sql, + invalidate_sql, + model_seed_sql, + populate_multi_key_snapshot_expected_sql, + populate_snapshot_expected_sql, + populate_snapshot_expected_valid_to_current_sql, + ref_snapshot_sql, + seed_insert_sql, + seed_multi_key_insert_sql, + snapshot_actual_sql, + snapshots_multi_key_yml, + snapshots_no_column_names_yml, + snapshots_valid_to_current_yml, + snapshots_yml, + update_multi_key_sql, + update_sql, + update_with_current_sql, +) + + +class BaseSnapshotColumnNames: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + def test_snapshot_column_names(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # run_dbt(["test"]) + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +class TestSnapshotColumnNames(BaseSnapshotColumnNames): + pass + + +class BaseSnapshotColumnNamesFromDbtProject: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_no_column_names_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + } + } + } + + def test_snapshot_column_names_from_project(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # run_dbt(["test"]) + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +class TestSnapshotColumnNamesFromDbtProject(BaseSnapshotColumnNamesFromDbtProject): + pass + + +class BaseSnapshotInvalidColumnNames: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_no_column_names_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + } + } + } + + def test_snapshot_invalid_column_names(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + manifest = get_manifest(project.project_root) + snapshot_node = manifest.nodes["snapshot.test.snapshot_actual"] + snapshot_node.config.snapshot_meta_column_names == { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + # Change snapshot_meta_columns and look for an error + different_columns = { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_updated_at": "test_updated_at", + } + } + } + } + update_config_file(different_columns, "dbt_project.yml") + + results, log_output = run_dbt_and_capture(["snapshot"], expect_pass=False) + assert len(results) == 1 + assert "Compilation Error in snapshot snapshot_actual" in log_output + assert "Snapshot target is missing configured columns" in log_output + + +class TestSnapshotInvalidColumnNames(BaseSnapshotInvalidColumnNames): + pass + + +class BaseSnapshotDbtValidToCurrent: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_valid_to_current_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + def test_valid_to_current(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_valid_to_current_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + original_snapshot = run_sql_with_adapter( + project.adapter, + "select id, test_scd_id, test_valid_to from {schema}.snapshot_actual", + "all", + ) + assert original_snapshot[0][2] == datetime.datetime(2099, 12, 31, 0, 0) + assert original_snapshot[9][2] == datetime.datetime(2099, 12, 31, 0, 0) + + project.run_sql(invalidate_sql) + project.run_sql(update_with_current_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + updated_snapshot = run_sql_with_adapter( + project.adapter, + "select id, test_scd_id, test_valid_to from {schema}.snapshot_actual", + "all", + ) + assert updated_snapshot[0][2] == datetime.datetime(2099, 12, 31, 0, 0) + # Original row that was updated now has a non-current (2099/12/31) date + assert updated_snapshot[9][2] == datetime.datetime(2016, 8, 20, 16, 44, 49) + # Updated row has a current date + assert updated_snapshot[20][2] == datetime.datetime(2099, 12, 31, 0, 0) + + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +class TestSnapshotDbtValidToCurrent(BaseSnapshotDbtValidToCurrent): + pass + + +# This uses snapshot_meta_column_names, yaml-only snapshot def, +# and multiple keys +class BaseSnapshotMultiUniqueKey: + @pytest.fixture(scope="class") + def models(self): + return { + "seed.sql": model_seed_sql, + "snapshots.yml": snapshots_multi_key_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + def test_multi_column_unique_key(self, project): + project.run_sql(create_multi_key_seed_sql) + project.run_sql(create_multi_key_snapshot_expected_sql) + project.run_sql(seed_multi_key_insert_sql) + project.run_sql(populate_multi_key_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_multi_key_sql) + project.run_sql(update_multi_key_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # run_dbt(["test"]) + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +class TestSnapshotMultiUniqueKey(BaseSnapshotMultiUniqueKey): + pass diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 71c8588b17f..02c6976c848 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -162,11 +162,8 @@ def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strateg with mock.patch.object( type(project.adapter), "valid_incremental_strategies", lambda _: [] ): - # Initial run - with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run"]) - - # Incremental run fails + # Run of microbatch model while adapter doesn't have a "valid" + # microbatch strategy causes an error to be raised with patch_microbatch_end_time("2020-01-03 13:57:00"): _, logs = run_dbt_and_capture(["run"], expect_pass=False) assert "'microbatch' is not valid" in logs diff --git a/tests/functional/run_operations/test_run_operations.py b/tests/functional/run_operations/test_run_operations.py index 064c98b3a51..258ed679d7e 100644 --- a/tests/functional/run_operations/test_run_operations.py +++ b/tests/functional/run_operations/test_run_operations.py @@ -3,6 +3,7 @@ import pytest import yaml +from dbt.artifacts.schemas.results import RunStatus from dbt.tests.util import ( check_table_does_exist, mkdir, @@ -135,9 +136,25 @@ def test_run_operation_local_macro(self, project): run_dbt(["deps"]) results, log_output = run_dbt_and_capture(["run-operation", "something_cool"]) + + for result in results: + if result.status == RunStatus.Skipped: + continue + + timing_keys = [timing.name for timing in result.timing] + assert timing_keys == ["compile", "execute"] + assert "something cool" in log_output results, log_output = run_dbt_and_capture(["run-operation", "pkg.something_cool"]) + + for result in results: + if result.status == RunStatus.Skipped: + continue + + timing_keys = [timing.name for timing in result.timing] + assert timing_keys == ["compile", "execute"] + assert "something cool" in log_output rm_dir("pkg") diff --git a/tests/functional/snapshots/fixtures.py b/tests/functional/snapshots/fixtures.py index 5b3182098d2..562fcb4b10f 100644 --- a/tests/functional/snapshots/fixtures.py +++ b/tests/functional/snapshots/fixtures.py @@ -292,7 +292,6 @@ """ snapshots_pg__snapshot_yml = """ -version: 2 snapshots: - name: snapshot_actual relation: "ref('seed')" @@ -304,6 +303,18 @@ owner: 'a_owner' """ +snapshots_pg__snapshot_mod_yml = """ +snapshots: + - name: snapshot_actual + relation: "ref('seed')" + config: + unique_key: "id || '-' || first_name" + strategy: timestamp + updated_at: updated_at + meta: + owner: 'b_owner' +""" + snapshots_pg__snapshot_no_target_schema_sql = """ {% snapshot snapshot_actual %} diff --git a/tests/functional/snapshots/test_basic_snapshot.py b/tests/functional/snapshots/test_basic_snapshot.py index b5a508b04a9..5300f921971 100644 --- a/tests/functional/snapshots/test_basic_snapshot.py +++ b/tests/functional/snapshots/test_basic_snapshot.py @@ -8,6 +8,7 @@ check_relations_equal, relation_from_name, run_dbt, + update_config_file, write_file, ) from tests.functional.snapshots.fixtures import ( @@ -18,6 +19,7 @@ models__schema_yml, seeds__seed_csv, seeds__seed_newcol_csv, + snapshots_pg__snapshot_mod_yml, snapshots_pg__snapshot_no_target_schema_sql, snapshots_pg__snapshot_sql, snapshots_pg__snapshot_yml, @@ -394,3 +396,34 @@ def models(self): class TestBasicSnapshotYaml(BasicYaml): def test_basic_snapshot_yaml(self, project): snapshot_setup(project, num_snapshot_models=1) + + +class TestYamlSnapshotPartialParsing(BasicYaml): + def test_snapshot_partial_parsing(self, project): + manifest = run_dbt(["parse"]) + snapshot_id = "snapshot.test.snapshot_actual" + assert snapshot_id in manifest.nodes + snapshot = manifest.nodes[snapshot_id] + assert snapshot.meta["owner"] == "a_owner" + + # change snapshot yml file and re-parse + write_file(snapshots_pg__snapshot_mod_yml, "snapshots", "snapshot.yml") + manifest = run_dbt(["parse"]) + snapshot = manifest.nodes[snapshot_id] + assert snapshot.meta["owner"] == "b_owner" + + # modify dbt_project.yml and re-parse + config_updates = { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + }, + } + } + } + update_config_file(config_updates, "dbt_project.yml") + manifest = run_dbt(["parse"]) + snapshot = manifest.nodes[snapshot_id] + assert snapshot.config.snapshot_meta_column_names.dbt_valid_to == "test_valid_to" diff --git a/tests/unit/test_events.py b/tests/unit/test_events.py index 085f849492e..f6ac66f0034 100644 --- a/tests/unit/test_events.py +++ b/tests/unit/test_events.py @@ -517,7 +517,7 @@ def test_all_serializable(self): def test_date_serialization(): - ti = TimingInfo("test") + ti = TimingInfo("compile") ti.begin() ti.end() ti_dict = ti.to_dict()