Skip to content

Commit

Permalink
Fix back compat for run_results pre-v5 (#9009)
Browse files Browse the repository at this point in the history
* Fix back compat for run_results pre-v5

* Add type annotations

* Add functional testing

* Add inline annotations

* Add changelog entry.

* Consolidate upgrade_schema_version + upgrade_run_results_json

* Restore accidentally reverted test cases

* Pre-commit fixups

---------

Co-authored-by: Peter Allen Webb <[email protected]>
  • Loading branch information
jtcohen6 and peterallenwebb authored Nov 6, 2023
1 parent c7c3ac8 commit 6c1822f
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 25 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20231106-155933.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Update run_results.json from previous versions of dbt to support deferral and
rerun from failure
time: 2023-11-06T15:59:33.677915-05:00
custom:
Author: jtcohen6 peterallenwebb
Issue: "9010"
28 changes: 10 additions & 18 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Generic,
AbstractSet,
ClassVar,
Iterable,
)
from typing_extensions import Protocol
from uuid import UUID
Expand All @@ -45,7 +46,13 @@
from dbt.contracts.graph.unparsed import SourcePatch, NodeVersion, UnparsedVersion
from dbt.contracts.graph.manifest_upgrade import upgrade_manifest_json
from dbt.contracts.files import SourceFile, SchemaSourceFile, FileHash, AnySourceFile
from dbt.contracts.util import BaseArtifactMetadata, SourceKey, ArtifactMixin, schema_version
from dbt.contracts.util import (
BaseArtifactMetadata,
SourceKey,
ArtifactMixin,
schema_version,
get_artifact_schema_version,
)
from dbt.dataclass_schema import dbtClassMixin
from dbt.exceptions import (
CompilationError,
Expand Down Expand Up @@ -1602,7 +1609,7 @@ class WritableManifest(ArtifactMixin):
)

@classmethod
def compatible_previous_versions(self):
def compatible_previous_versions(cls) -> Iterable[Tuple[str, int]]:
return [
("manifest", 4),
("manifest", 5),
Expand All @@ -1617,7 +1624,7 @@ def compatible_previous_versions(self):
def upgrade_schema_version(cls, data):
"""This overrides the "upgrade_schema_version" call in VersionedSchema (via
ArtifactMixin) to modify the dictionary passed in from earlier versions of the manifest."""
manifest_schema_version = get_manifest_schema_version(data)
manifest_schema_version = get_artifact_schema_version(data)
if manifest_schema_version <= 10:
data = upgrade_manifest_json(data, manifest_schema_version)
return cls.from_dict(data)
Expand All @@ -1631,21 +1638,6 @@ def __post_serialize__(self, dct):
return dct


def get_manifest_schema_version(dct: dict) -> int:
schema_version = dct.get("metadata", {}).get("dbt_schema_version", None)
if not schema_version:
raise ValueError("Manifest doesn't have schema version")

# schema_version is in this format: https://schemas.getdbt.com/dbt/manifest/v10.json
# What the code below is doing:
# 1. Split on "/" – v10.json
# 2. Split on "." – v10
# 3. Skip first character – 10
# 4. Convert to int
# TODO: If this gets more complicated, turn into a regex
return int(schema_version.split("/")[-1].split(".")[0][1:])


def _check_duplicates(value: BaseNode, src: Mapping[str, BaseNode]):
if value.unique_id in src:
raise DuplicateResourceNameError(value, src[value.unique_id])
Expand Down
24 changes: 24 additions & 0 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
VersionedSchema,
Replaceable,
schema_version,
get_artifact_schema_version,
)
from dbt.exceptions import DbtInternalError
from dbt.events.functions import fire_event
Expand All @@ -31,6 +32,8 @@
Optional,
Sequence,
Union,
Iterable,
Tuple,
)

from dbt.clients.system import write_json
Expand Down Expand Up @@ -268,6 +271,27 @@ def from_execution_results(
)
return cls(metadata=meta, results=processed_results, elapsed_time=elapsed_time, args=args)

@classmethod
def compatible_previous_versions(cls) -> Iterable[Tuple[str, int]]:
return [
("run-results", 4),
]

@classmethod
def upgrade_schema_version(cls, data):
"""This overrides the "upgrade_schema_version" call in VersionedSchema (via
ArtifactMixin) to modify the dictionary passed in from earlier versions of the run_results."""
run_results_schema_version = get_artifact_schema_version(data)
# If less than the current version (v5), preprocess contents to match latest schema version
if run_results_schema_version <= 5:
# In v5, we added 'compiled' attributes to each result entry
# Going forward, dbt expects these to be populated
for result in data["results"]:
result["compiled"] = False
result["compiled_code"] = ""
result["relation_name"] = ""
return cls.from_dict(data)

def write(self, path: str):
write_json(path, self.to_dict(omit_none=False))

Expand Down
15 changes: 15 additions & 0 deletions core/dbt/contracts/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,21 @@ def validate(cls, data):
raise DbtInternalError("Cannot call from_dict with no schema version!")


def get_artifact_schema_version(dct: dict) -> int:
schema_version = dct.get("metadata", {}).get("dbt_schema_version", None)
if not schema_version:
raise ValueError("Artifact is missing schema version")

# schema_version is in this format: https://schemas.getdbt.com/dbt/manifest/v10.json
# What the code below is doing:
# 1. Split on "/" – v10.json
# 2. Split on "." – v10
# 3. Skip first character – 10
# 4. Convert to int
# TODO: If this gets more complicated, turn into a regex
return int(schema_version.split("/")[-1].split(".")[0][1:])


class Identifier(ValidatedStringMixin):
"""Our definition of a valid Identifier is the same as what's valid for an unquoted database table name.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"metadata": {"dbt_schema_version": "https://schemas.getdbt.com/dbt/run-results/v4.json", "dbt_version": "1.6.7", "generated_at": "2023-11-06T20:40:37.557735Z", "invocation_id": "42f85a60-4f7b-4cc1-a197-62687104fecc", "env": {}}, "results": [{"status": "success", "timing": [{"name": "compile", "started_at": "2023-11-06T20:40:37.486980Z", "completed_at": "2023-11-06T20:40:37.488837Z"}, {"name": "execute", "started_at": "2023-11-06T20:40:37.490290Z", "completed_at": "2023-11-06T20:40:37.539787Z"}], "thread_id": "Thread-9 (worker)", "execution_time": 0.0566411018371582, "adapter_response": {"_message": "CREATE VIEW", "code": "CREATE VIEW", "rows_affected": -1}, "message": "CREATE VIEW", "failures": null, "unique_id": "model.test.my_model"}, {"status": "success", "timing": [{"name": "compile", "started_at": "2023-11-06T20:40:37.485334Z", "completed_at": "2023-11-06T20:40:37.489266Z"}, {"name": "execute", "started_at": "2023-11-06T20:40:37.494545Z", "completed_at": "2023-11-06T20:40:37.542811Z"}], "thread_id": "Thread-8 (worker)", "execution_time": 0.060118675231933594, "adapter_response": {"_message": "CREATE VIEW", "code": "CREATE VIEW", "rows_affected": -1}, "message": "CREATE VIEW", "failures": null, "unique_id": "model.test.metricflow_time_spine"}], "elapsed_time": 0.18144583702087402, "args": {"defer": false, "indirect_selection": "eager", "select": [], "log_level_file": "debug", "use_colors": true, "cache_selected_only": false, "strict_mode": false, "use_colors_file": true, "partial_parse_file_diff": true, "static_parser": true, "write_json": true, "warn_error_options": {"include": [], "exclude": []}, "print": true, "log_level": "info", "profiles_dir": "/private/var/folders/7h/hj5_fw9j291c58hwfdvy5xbm0000gp/T/pytest-of-jerco/pytest-16/profile0", "log_path": "/Users/jerco/dev/product/dbt-core/logs/test16993032361853467608", "partial_parse": true, "quiet": false, "log_format_file": "debug", "version_check": true, "send_anonymous_usage_stats": false, "project_dir": "/private/var/folders/7h/hj5_fw9j291c58hwfdvy5xbm0000gp/T/pytest-of-jerco/pytest-16/project0", "log_format": "default", "enable_legacy_logger": false, "exclude": [], "populate_cache": true, "log_file_max_bytes": 10485760, "macro_debugging": false, "printer_width": 80, "invocation_command": "dbt tests/functional/artifacts/test_previous_version_state.py::TestPreviousVersionState", "which": "run", "favor_state": false, "introspect": true, "vars": {}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"metadata": {"dbt_schema_version": "https://schemas.getdbt.com/dbt/run-results/v5.json", "dbt_version": "1.8.0a1", "generated_at": "2023-11-06T20:43:08.231028Z", "invocation_id": "a9238a29-6764-47f0-ba7d-f7d61ae5e6c0", "env": {}}, "results": [{"status": "success", "timing": [{"name": "compile", "started_at": "2023-11-06T20:43:08.146847Z", "completed_at": "2023-11-06T20:43:08.149862Z"}, {"name": "execute", "started_at": "2023-11-06T20:43:08.151676Z", "completed_at": "2023-11-06T20:43:08.206208Z"}], "thread_id": "Thread-9 (worker)", "execution_time": 0.06433510780334473, "adapter_response": {"_message": "CREATE VIEW", "code": "CREATE VIEW", "rows_affected": -1}, "message": "CREATE VIEW", "failures": null, "unique_id": "model.test.my_model", "compiled": true, "compiled_code": "select 1 as id", "relation_name": "\"dbt\".\"test16993033859513627134_test_previous_version_state\".\"my_model\""}, {"status": "success", "timing": [{"name": "compile", "started_at": "2023-11-06T20:43:08.144982Z", "completed_at": "2023-11-06T20:43:08.150320Z"}, {"name": "execute", "started_at": "2023-11-06T20:43:08.155222Z", "completed_at": "2023-11-06T20:43:08.209881Z"}], "thread_id": "Thread-8 (worker)", "execution_time": 0.06822013854980469, "adapter_response": {"_message": "CREATE VIEW", "code": "CREATE VIEW", "rows_affected": -1}, "message": "CREATE VIEW", "failures": null, "unique_id": "model.test.metricflow_time_spine", "compiled": true, "compiled_code": "SELECT to_date('02/20/2023', 'mm/dd/yyyy') as date_day", "relation_name": "\"dbt\".\"test16993033859513627134_test_previous_version_state\".\"metricflow_time_spine\""}], "elapsed_time": 0.18284392356872559, "args": {"send_anonymous_usage_stats": false, "profiles_dir": "/private/var/folders/7h/hj5_fw9j291c58hwfdvy5xbm0000gp/T/pytest-of-jerco/pytest-19/profile0", "static_parser": true, "partial_parse_file_diff": true, "printer_width": 80, "log_level_file": "debug", "project_dir": "/private/var/folders/7h/hj5_fw9j291c58hwfdvy5xbm0000gp/T/pytest-of-jerco/pytest-19/project0", "log_format": "default", "strict_mode": false, "macro_debugging": false, "indirect_selection": "eager", "version_check": true, "use_colors_file": true, "select": [], "log_file_max_bytes": 10485760, "warn_error_options": {"include": [], "exclude": []}, "log_format_file": "debug", "invocation_command": "dbt tests/functional/artifacts/test_previous_version_state.py::TestPreviousVersionState", "write_json": true, "log_level": "info", "cache_selected_only": false, "quiet": false, "favor_state": false, "enable_legacy_logger": false, "log_path": "/Users/jerco/dev/product/dbt-core/logs/test16993033859513627134", "which": "run", "partial_parse": true, "introspect": true, "show_resource_report": false, "exclude": [], "populate_cache": true, "vars": {}, "use_colors": true, "defer": false, "print": true}}
69 changes: 62 additions & 7 deletions tests/functional/artifacts/test_previous_version_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import pytest

from dbt.contracts.graph.manifest import WritableManifest, get_manifest_schema_version
from dbt.contracts.util import get_artifact_schema_version
from dbt.contracts.graph.manifest import WritableManifest
from dbt.contracts.results import RunResultsArtifact
from dbt.exceptions import IncompatibleSchemaError
from dbt.tests.util import run_dbt, get_manifest

Expand Down Expand Up @@ -261,6 +263,7 @@

class TestPreviousVersionState:
CURRENT_EXPECTED_MANIFEST_VERSION = 11
CURRENT_EXPECTED_RUN_RESULTS_VERSION = 5

@pytest.fixture(scope="class")
def models(self):
Expand Down Expand Up @@ -330,13 +333,27 @@ def generate_latest_manifest(
project,
current_manifest_version,
):
run_dbt(["list"])
run_dbt(["parse"])
source_path = os.path.join(project.project_root, "target/manifest.json")
state_path = os.path.join(project.test_data_dir, f"state/v{current_manifest_version}")
target_path = os.path.join(state_path, "manifest.json")
os.makedirs(state_path, exist_ok=True)
shutil.copyfile(source_path, target_path)

# Use this method when generating a new run_results version for the first time.
# Once generated, we shouldn't need to re-generate or modify the manifest.
def generate_latest_run_results(
self,
project,
current_run_results_version,
):
run_dbt(["run"])
source_path = os.path.join(project.project_root, "target/run_results.json")
state_path = os.path.join(project.test_data_dir, f"results/v{current_run_results_version}")
target_path = os.path.join(state_path, "run_results.json")
os.makedirs(state_path, exist_ok=True)
shutil.copyfile(source_path, target_path)

# The actual test method. Run `dbt list --select state:modified --state ...`
# once for each past manifest version. They all have the same content, but different
# schema/structure, only some of which are forward-compatible with the
Expand Down Expand Up @@ -365,14 +382,38 @@ def compare_previous_state(
with pytest.raises(IncompatibleSchemaError):
run_dbt(cli_args, expect_pass=expect_pass)

# The actual test method. Run `dbt retry --state ...`
# once for each past run_results version. They all have the same content, but different
# schema/structure, only some of which are forward-compatible with the
# current WritableManifest class.
def compare_previous_results(
self,
project,
compare_run_results_version,
expect_pass,
num_results,
):
state_path = os.path.join(project.test_data_dir, f"results/v{compare_run_results_version}")
cli_args = [
"retry",
"--state",
state_path,
]
if expect_pass:
results = run_dbt(cli_args, expect_pass=expect_pass)
assert len(results) == num_results
else:
with pytest.raises(IncompatibleSchemaError):
run_dbt(cli_args, expect_pass=expect_pass)

def test_compare_state_current(self, project):
current_schema_version = WritableManifest.dbt_schema_version.version
current_manifest_schema_version = WritableManifest.dbt_schema_version.version
assert (
current_schema_version == self.CURRENT_EXPECTED_MANIFEST_VERSION
current_manifest_schema_version == self.CURRENT_EXPECTED_MANIFEST_VERSION
), "Sounds like you've bumped the manifest version and need to update this test!"
# If we need a newly generated manifest, uncomment the following line and commit the result
# self.generate_latest_manifest(project, current_schema_version)
self.compare_previous_state(project, current_schema_version, True, 0)
# self.generate_latest_manifest(project, current_manifest_schema_version)
self.compare_previous_state(project, current_manifest_schema_version, True, 0)

def test_backwards_compatible_versions(self, project):
# manifest schema version 4 and greater should always be forward compatible
Expand All @@ -393,5 +434,19 @@ def test_get_manifest_schema_version(self, project):
)
manifest = json.load(open(manifest_path))

manifest_version = get_manifest_schema_version(manifest)
manifest_version = get_artifact_schema_version(manifest)
assert manifest_version == schema_version

def test_compare_results_current(self, project):
current_run_results_schema_version = RunResultsArtifact.dbt_schema_version.version
assert (
current_run_results_schema_version == self.CURRENT_EXPECTED_RUN_RESULTS_VERSION
), "Sounds like you've bumped the run_results version and need to update this test!"
# If we need a newly generated run_results, uncomment the following line and commit the result
# self.generate_latest_run_results(project, current_run_results_schema_version)
self.compare_previous_results(project, current_run_results_schema_version, True, 0)

def test_backwards_compatible_run_results_versions(self, project):
# run_results schema version 4 and greater should always be forward compatible
for schema_version in range(4, self.CURRENT_EXPECTED_RUN_RESULTS_VERSION):
self.compare_previous_results(project, schema_version, True, 0)

0 comments on commit 6c1822f

Please sign in to comment.