From 6c1822f1869869df301795c0b0b1e72f393540fe Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Mon, 6 Nov 2023 16:55:36 -0500 Subject: [PATCH] Fix back compat for run_results pre-v5 (#9009) * Fix back compat for run_results pre-v5 * Add type annotations * Add functional testing * Add inline annotations * Add changelog entry. * Consolidate upgrade_schema_version + upgrade_run_results_json * Restore accidentally reverted test cases * Pre-commit fixups --------- Co-authored-by: Peter Allen Webb --- .../unreleased/Fixes-20231106-155933.yaml | 7 ++ core/dbt/contracts/graph/manifest.py | 28 +++----- core/dbt/contracts/results.py | 24 +++++++ core/dbt/contracts/util.py | 15 ++++ .../data/results/v4/run_results.json | 1 + .../data/results/v5/run_results.json | 1 + .../artifacts/test_previous_version_state.py | 69 +++++++++++++++++-- 7 files changed, 120 insertions(+), 25 deletions(-) create mode 100644 .changes/unreleased/Fixes-20231106-155933.yaml create mode 100644 tests/functional/artifacts/data/results/v4/run_results.json create mode 100644 tests/functional/artifacts/data/results/v5/run_results.json diff --git a/.changes/unreleased/Fixes-20231106-155933.yaml b/.changes/unreleased/Fixes-20231106-155933.yaml new file mode 100644 index 00000000000..9e4201e4962 --- /dev/null +++ b/.changes/unreleased/Fixes-20231106-155933.yaml @@ -0,0 +1,7 @@ +kind: Fixes +body: Update run_results.json from previous versions of dbt to support deferral and + rerun from failure +time: 2023-11-06T15:59:33.677915-05:00 +custom: + Author: jtcohen6 peterallenwebb + Issue: "9010" diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index 7f6c16cdca1..37ddf73cb0e 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -20,6 +20,7 @@ Generic, AbstractSet, ClassVar, + Iterable, ) from typing_extensions import Protocol from uuid import UUID @@ -45,7 +46,13 @@ from dbt.contracts.graph.unparsed import SourcePatch, NodeVersion, UnparsedVersion from dbt.contracts.graph.manifest_upgrade import upgrade_manifest_json from dbt.contracts.files import SourceFile, SchemaSourceFile, FileHash, AnySourceFile -from dbt.contracts.util import BaseArtifactMetadata, SourceKey, ArtifactMixin, schema_version +from dbt.contracts.util import ( + BaseArtifactMetadata, + SourceKey, + ArtifactMixin, + schema_version, + get_artifact_schema_version, +) from dbt.dataclass_schema import dbtClassMixin from dbt.exceptions import ( CompilationError, @@ -1602,7 +1609,7 @@ class WritableManifest(ArtifactMixin): ) @classmethod - def compatible_previous_versions(self): + def compatible_previous_versions(cls) -> Iterable[Tuple[str, int]]: return [ ("manifest", 4), ("manifest", 5), @@ -1617,7 +1624,7 @@ def compatible_previous_versions(self): def upgrade_schema_version(cls, data): """This overrides the "upgrade_schema_version" call in VersionedSchema (via ArtifactMixin) to modify the dictionary passed in from earlier versions of the manifest.""" - manifest_schema_version = get_manifest_schema_version(data) + manifest_schema_version = get_artifact_schema_version(data) if manifest_schema_version <= 10: data = upgrade_manifest_json(data, manifest_schema_version) return cls.from_dict(data) @@ -1631,21 +1638,6 @@ def __post_serialize__(self, dct): return dct -def get_manifest_schema_version(dct: dict) -> int: - schema_version = dct.get("metadata", {}).get("dbt_schema_version", None) - if not schema_version: - raise ValueError("Manifest doesn't have schema version") - - # schema_version is in this format: https://schemas.getdbt.com/dbt/manifest/v10.json - # What the code below is doing: - # 1. Split on "/" – v10.json - # 2. Split on "." – v10 - # 3. Skip first character – 10 - # 4. Convert to int - # TODO: If this gets more complicated, turn into a regex - return int(schema_version.split("/")[-1].split(".")[0][1:]) - - def _check_duplicates(value: BaseNode, src: Mapping[str, BaseNode]): if value.unique_id in src: raise DuplicateResourceNameError(value, src[value.unique_id]) diff --git a/core/dbt/contracts/results.py b/core/dbt/contracts/results.py index 2278e5561a6..a94abe0dfda 100644 --- a/core/dbt/contracts/results.py +++ b/core/dbt/contracts/results.py @@ -8,6 +8,7 @@ VersionedSchema, Replaceable, schema_version, + get_artifact_schema_version, ) from dbt.exceptions import DbtInternalError from dbt.events.functions import fire_event @@ -31,6 +32,8 @@ Optional, Sequence, Union, + Iterable, + Tuple, ) from dbt.clients.system import write_json @@ -268,6 +271,27 @@ def from_execution_results( ) return cls(metadata=meta, results=processed_results, elapsed_time=elapsed_time, args=args) + @classmethod + def compatible_previous_versions(cls) -> Iterable[Tuple[str, int]]: + return [ + ("run-results", 4), + ] + + @classmethod + def upgrade_schema_version(cls, data): + """This overrides the "upgrade_schema_version" call in VersionedSchema (via + ArtifactMixin) to modify the dictionary passed in from earlier versions of the run_results.""" + run_results_schema_version = get_artifact_schema_version(data) + # If less than the current version (v5), preprocess contents to match latest schema version + if run_results_schema_version <= 5: + # In v5, we added 'compiled' attributes to each result entry + # Going forward, dbt expects these to be populated + for result in data["results"]: + result["compiled"] = False + result["compiled_code"] = "" + result["relation_name"] = "" + return cls.from_dict(data) + def write(self, path: str): write_json(path, self.to_dict(omit_none=False)) diff --git a/core/dbt/contracts/util.py b/core/dbt/contracts/util.py index e632203c0b1..8f31b950dfe 100644 --- a/core/dbt/contracts/util.py +++ b/core/dbt/contracts/util.py @@ -261,6 +261,21 @@ def validate(cls, data): raise DbtInternalError("Cannot call from_dict with no schema version!") +def get_artifact_schema_version(dct: dict) -> int: + schema_version = dct.get("metadata", {}).get("dbt_schema_version", None) + if not schema_version: + raise ValueError("Artifact is missing schema version") + + # schema_version is in this format: https://schemas.getdbt.com/dbt/manifest/v10.json + # What the code below is doing: + # 1. Split on "/" – v10.json + # 2. Split on "." – v10 + # 3. Skip first character – 10 + # 4. Convert to int + # TODO: If this gets more complicated, turn into a regex + return int(schema_version.split("/")[-1].split(".")[0][1:]) + + class Identifier(ValidatedStringMixin): """Our definition of a valid Identifier is the same as what's valid for an unquoted database table name. diff --git a/tests/functional/artifacts/data/results/v4/run_results.json b/tests/functional/artifacts/data/results/v4/run_results.json new file mode 100644 index 00000000000..0767eb8e801 --- /dev/null +++ b/tests/functional/artifacts/data/results/v4/run_results.json @@ -0,0 +1 @@ +{"metadata": {"dbt_schema_version": "https://schemas.getdbt.com/dbt/run-results/v4.json", "dbt_version": "1.6.7", "generated_at": "2023-11-06T20:40:37.557735Z", "invocation_id": "42f85a60-4f7b-4cc1-a197-62687104fecc", "env": {}}, "results": [{"status": "success", "timing": [{"name": "compile", "started_at": "2023-11-06T20:40:37.486980Z", "completed_at": "2023-11-06T20:40:37.488837Z"}, {"name": "execute", "started_at": "2023-11-06T20:40:37.490290Z", "completed_at": "2023-11-06T20:40:37.539787Z"}], "thread_id": "Thread-9 (worker)", "execution_time": 0.0566411018371582, "adapter_response": {"_message": "CREATE VIEW", "code": "CREATE VIEW", "rows_affected": -1}, "message": "CREATE VIEW", "failures": null, "unique_id": "model.test.my_model"}, {"status": "success", "timing": [{"name": "compile", "started_at": "2023-11-06T20:40:37.485334Z", "completed_at": "2023-11-06T20:40:37.489266Z"}, {"name": "execute", "started_at": "2023-11-06T20:40:37.494545Z", "completed_at": "2023-11-06T20:40:37.542811Z"}], "thread_id": "Thread-8 (worker)", "execution_time": 0.060118675231933594, "adapter_response": {"_message": "CREATE VIEW", "code": "CREATE VIEW", "rows_affected": -1}, "message": "CREATE VIEW", "failures": null, "unique_id": "model.test.metricflow_time_spine"}], "elapsed_time": 0.18144583702087402, "args": {"defer": false, "indirect_selection": "eager", "select": [], "log_level_file": "debug", "use_colors": true, "cache_selected_only": false, "strict_mode": false, "use_colors_file": true, "partial_parse_file_diff": true, "static_parser": true, "write_json": true, "warn_error_options": {"include": [], "exclude": []}, "print": true, "log_level": "info", "profiles_dir": "/private/var/folders/7h/hj5_fw9j291c58hwfdvy5xbm0000gp/T/pytest-of-jerco/pytest-16/profile0", "log_path": "/Users/jerco/dev/product/dbt-core/logs/test16993032361853467608", "partial_parse": true, "quiet": false, "log_format_file": "debug", "version_check": true, "send_anonymous_usage_stats": false, "project_dir": "/private/var/folders/7h/hj5_fw9j291c58hwfdvy5xbm0000gp/T/pytest-of-jerco/pytest-16/project0", "log_format": "default", "enable_legacy_logger": false, "exclude": [], "populate_cache": true, "log_file_max_bytes": 10485760, "macro_debugging": false, "printer_width": 80, "invocation_command": "dbt tests/functional/artifacts/test_previous_version_state.py::TestPreviousVersionState", "which": "run", "favor_state": false, "introspect": true, "vars": {}}} diff --git a/tests/functional/artifacts/data/results/v5/run_results.json b/tests/functional/artifacts/data/results/v5/run_results.json new file mode 100644 index 00000000000..63a7a58eabc --- /dev/null +++ b/tests/functional/artifacts/data/results/v5/run_results.json @@ -0,0 +1 @@ +{"metadata": {"dbt_schema_version": "https://schemas.getdbt.com/dbt/run-results/v5.json", "dbt_version": "1.8.0a1", "generated_at": "2023-11-06T20:43:08.231028Z", "invocation_id": "a9238a29-6764-47f0-ba7d-f7d61ae5e6c0", "env": {}}, "results": [{"status": "success", "timing": [{"name": "compile", "started_at": "2023-11-06T20:43:08.146847Z", "completed_at": "2023-11-06T20:43:08.149862Z"}, {"name": "execute", "started_at": "2023-11-06T20:43:08.151676Z", "completed_at": "2023-11-06T20:43:08.206208Z"}], "thread_id": "Thread-9 (worker)", "execution_time": 0.06433510780334473, "adapter_response": {"_message": "CREATE VIEW", "code": "CREATE VIEW", "rows_affected": -1}, "message": "CREATE VIEW", "failures": null, "unique_id": "model.test.my_model", "compiled": true, "compiled_code": "select 1 as id", "relation_name": "\"dbt\".\"test16993033859513627134_test_previous_version_state\".\"my_model\""}, {"status": "success", "timing": [{"name": "compile", "started_at": "2023-11-06T20:43:08.144982Z", "completed_at": "2023-11-06T20:43:08.150320Z"}, {"name": "execute", "started_at": "2023-11-06T20:43:08.155222Z", "completed_at": "2023-11-06T20:43:08.209881Z"}], "thread_id": "Thread-8 (worker)", "execution_time": 0.06822013854980469, "adapter_response": {"_message": "CREATE VIEW", "code": "CREATE VIEW", "rows_affected": -1}, "message": "CREATE VIEW", "failures": null, "unique_id": "model.test.metricflow_time_spine", "compiled": true, "compiled_code": "SELECT to_date('02/20/2023', 'mm/dd/yyyy') as date_day", "relation_name": "\"dbt\".\"test16993033859513627134_test_previous_version_state\".\"metricflow_time_spine\""}], "elapsed_time": 0.18284392356872559, "args": {"send_anonymous_usage_stats": false, "profiles_dir": "/private/var/folders/7h/hj5_fw9j291c58hwfdvy5xbm0000gp/T/pytest-of-jerco/pytest-19/profile0", "static_parser": true, "partial_parse_file_diff": true, "printer_width": 80, "log_level_file": "debug", "project_dir": "/private/var/folders/7h/hj5_fw9j291c58hwfdvy5xbm0000gp/T/pytest-of-jerco/pytest-19/project0", "log_format": "default", "strict_mode": false, "macro_debugging": false, "indirect_selection": "eager", "version_check": true, "use_colors_file": true, "select": [], "log_file_max_bytes": 10485760, "warn_error_options": {"include": [], "exclude": []}, "log_format_file": "debug", "invocation_command": "dbt tests/functional/artifacts/test_previous_version_state.py::TestPreviousVersionState", "write_json": true, "log_level": "info", "cache_selected_only": false, "quiet": false, "favor_state": false, "enable_legacy_logger": false, "log_path": "/Users/jerco/dev/product/dbt-core/logs/test16993033859513627134", "which": "run", "partial_parse": true, "introspect": true, "show_resource_report": false, "exclude": [], "populate_cache": true, "vars": {}, "use_colors": true, "defer": false, "print": true}} diff --git a/tests/functional/artifacts/test_previous_version_state.py b/tests/functional/artifacts/test_previous_version_state.py index 3e7b047c2e2..b455c52842b 100644 --- a/tests/functional/artifacts/test_previous_version_state.py +++ b/tests/functional/artifacts/test_previous_version_state.py @@ -4,7 +4,9 @@ import pytest -from dbt.contracts.graph.manifest import WritableManifest, get_manifest_schema_version +from dbt.contracts.util import get_artifact_schema_version +from dbt.contracts.graph.manifest import WritableManifest +from dbt.contracts.results import RunResultsArtifact from dbt.exceptions import IncompatibleSchemaError from dbt.tests.util import run_dbt, get_manifest @@ -261,6 +263,7 @@ class TestPreviousVersionState: CURRENT_EXPECTED_MANIFEST_VERSION = 11 + CURRENT_EXPECTED_RUN_RESULTS_VERSION = 5 @pytest.fixture(scope="class") def models(self): @@ -330,13 +333,27 @@ def generate_latest_manifest( project, current_manifest_version, ): - run_dbt(["list"]) + run_dbt(["parse"]) source_path = os.path.join(project.project_root, "target/manifest.json") state_path = os.path.join(project.test_data_dir, f"state/v{current_manifest_version}") target_path = os.path.join(state_path, "manifest.json") os.makedirs(state_path, exist_ok=True) shutil.copyfile(source_path, target_path) + # Use this method when generating a new run_results version for the first time. + # Once generated, we shouldn't need to re-generate or modify the manifest. + def generate_latest_run_results( + self, + project, + current_run_results_version, + ): + run_dbt(["run"]) + source_path = os.path.join(project.project_root, "target/run_results.json") + state_path = os.path.join(project.test_data_dir, f"results/v{current_run_results_version}") + target_path = os.path.join(state_path, "run_results.json") + os.makedirs(state_path, exist_ok=True) + shutil.copyfile(source_path, target_path) + # The actual test method. Run `dbt list --select state:modified --state ...` # once for each past manifest version. They all have the same content, but different # schema/structure, only some of which are forward-compatible with the @@ -365,14 +382,38 @@ def compare_previous_state( with pytest.raises(IncompatibleSchemaError): run_dbt(cli_args, expect_pass=expect_pass) + # The actual test method. Run `dbt retry --state ...` + # once for each past run_results version. They all have the same content, but different + # schema/structure, only some of which are forward-compatible with the + # current WritableManifest class. + def compare_previous_results( + self, + project, + compare_run_results_version, + expect_pass, + num_results, + ): + state_path = os.path.join(project.test_data_dir, f"results/v{compare_run_results_version}") + cli_args = [ + "retry", + "--state", + state_path, + ] + if expect_pass: + results = run_dbt(cli_args, expect_pass=expect_pass) + assert len(results) == num_results + else: + with pytest.raises(IncompatibleSchemaError): + run_dbt(cli_args, expect_pass=expect_pass) + def test_compare_state_current(self, project): - current_schema_version = WritableManifest.dbt_schema_version.version + current_manifest_schema_version = WritableManifest.dbt_schema_version.version assert ( - current_schema_version == self.CURRENT_EXPECTED_MANIFEST_VERSION + current_manifest_schema_version == self.CURRENT_EXPECTED_MANIFEST_VERSION ), "Sounds like you've bumped the manifest version and need to update this test!" # If we need a newly generated manifest, uncomment the following line and commit the result - # self.generate_latest_manifest(project, current_schema_version) - self.compare_previous_state(project, current_schema_version, True, 0) + # self.generate_latest_manifest(project, current_manifest_schema_version) + self.compare_previous_state(project, current_manifest_schema_version, True, 0) def test_backwards_compatible_versions(self, project): # manifest schema version 4 and greater should always be forward compatible @@ -393,5 +434,19 @@ def test_get_manifest_schema_version(self, project): ) manifest = json.load(open(manifest_path)) - manifest_version = get_manifest_schema_version(manifest) + manifest_version = get_artifact_schema_version(manifest) assert manifest_version == schema_version + + def test_compare_results_current(self, project): + current_run_results_schema_version = RunResultsArtifact.dbt_schema_version.version + assert ( + current_run_results_schema_version == self.CURRENT_EXPECTED_RUN_RESULTS_VERSION + ), "Sounds like you've bumped the run_results version and need to update this test!" + # If we need a newly generated run_results, uncomment the following line and commit the result + # self.generate_latest_run_results(project, current_run_results_schema_version) + self.compare_previous_results(project, current_run_results_schema_version, True, 0) + + def test_backwards_compatible_run_results_versions(self, project): + # run_results schema version 4 and greater should always be forward compatible + for schema_version in range(4, self.CURRENT_EXPECTED_RUN_RESULTS_VERSION): + self.compare_previous_results(project, schema_version, True, 0)