From 4ac2b48366b197455b23672494dd94fcb14f2218 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 29 May 2024 14:26:28 +0100 Subject: [PATCH 01/12] Move catalog artifact schema to dbt_common --- core/dbt/artifacts/exceptions/__init__.py | 2 +- core/dbt/artifacts/exceptions/schemas.py | 31 --- core/dbt/artifacts/schemas/base.py | 175 ------------- core/dbt/artifacts/schemas/base/__init__.py | 1 + .../dbt/artifacts/schemas/catalog/__init__.py | 3 +- .../artifacts/schemas/catalog/v1/__init__.py | 0 .../artifacts/schemas/catalog/v1/catalog.py | 112 --------- core/dbt/version.py | 232 ------------------ core/dbt/version/__init__.py | 1 + 9 files changed, 4 insertions(+), 553 deletions(-) delete mode 100644 core/dbt/artifacts/exceptions/schemas.py delete mode 100644 core/dbt/artifacts/schemas/base.py create mode 100644 core/dbt/artifacts/schemas/base/__init__.py delete mode 100644 core/dbt/artifacts/schemas/catalog/v1/__init__.py delete mode 100644 core/dbt/artifacts/schemas/catalog/v1/catalog.py delete mode 100644 core/dbt/version.py create mode 100644 core/dbt/version/__init__.py diff --git a/core/dbt/artifacts/exceptions/__init__.py b/core/dbt/artifacts/exceptions/__init__.py index ad8d4ae51b7..991cd457dad 100644 --- a/core/dbt/artifacts/exceptions/__init__.py +++ b/core/dbt/artifacts/exceptions/__init__.py @@ -1 +1 @@ -from dbt.artifacts.exceptions.schemas import IncompatibleSchemaError +from dbt_common.artifacts.exceptions import * # noqa diff --git a/core/dbt/artifacts/exceptions/schemas.py b/core/dbt/artifacts/exceptions/schemas.py deleted file mode 100644 index c9f1b0e151f..00000000000 --- a/core/dbt/artifacts/exceptions/schemas.py +++ /dev/null @@ -1,31 +0,0 @@ -from typing import Optional - -from dbt_common.exceptions import DbtRuntimeError - - -class IncompatibleSchemaError(DbtRuntimeError): - def __init__(self, expected: str, found: Optional[str] = None) -> None: - self.expected = expected - self.found = found - self.filename = "input file" - - super().__init__(msg=self.get_message()) - - def add_filename(self, filename: str): - self.filename = filename - self.msg = self.get_message() - - def get_message(self) -> str: - found_str = "nothing" - if self.found is not None: - found_str = f'"{self.found}"' - - msg = ( - f'Expected a schema version of "{self.expected}" in ' - f"{self.filename}, but found {found_str}. Are you running with a " - f"different version of dbt?" - ) - return msg - - CODE = 10014 - MESSAGE = "Incompatible Schema" diff --git a/core/dbt/artifacts/schemas/base.py b/core/dbt/artifacts/schemas/base.py deleted file mode 100644 index c807257a24b..00000000000 --- a/core/dbt/artifacts/schemas/base.py +++ /dev/null @@ -1,175 +0,0 @@ -import dataclasses -import functools -from datetime import datetime -from typing import Any, ClassVar, Dict, Optional, Type, TypeVar - -from mashumaro.jsonschema import build_json_schema -from mashumaro.jsonschema.dialects import DRAFT_2020_12 - -from dbt.artifacts.exceptions import IncompatibleSchemaError -from dbt.version import __version__ -from dbt_common.clients.system import read_json, write_json -from dbt_common.dataclass_schema import dbtClassMixin -from dbt_common.events.functions import get_metadata_vars -from dbt_common.exceptions import DbtInternalError, DbtRuntimeError -from dbt_common.invocation import get_invocation_id - -BASE_SCHEMAS_URL = "https://schemas.getdbt.com/" -SCHEMA_PATH = "dbt/{name}/v{version}.json" - - -@dataclasses.dataclass -class SchemaVersion: - name: str - version: int - - @property - def path(self) -> str: - return SCHEMA_PATH.format(name=self.name, version=self.version) - - def __str__(self) -> str: - return BASE_SCHEMAS_URL + self.path - - -class Writable: - def write(self, path: str): - write_json(path, self.to_dict(omit_none=False, context={"artifact": True})) # type: ignore - - -class Readable: - @classmethod - def read(cls, path: str): - try: - data = read_json(path) - except (EnvironmentError, ValueError) as exc: - raise DbtRuntimeError( - f'Could not read {cls.__name__} at "{path}" as JSON: {exc}' - ) from exc - - return cls.from_dict(data) # type: ignore - - -# This is used in the ManifestMetadata, RunResultsMetadata, RunOperationResultMetadata, -# FreshnessMetadata, and CatalogMetadata classes -@dataclasses.dataclass -class BaseArtifactMetadata(dbtClassMixin): - dbt_schema_version: str - dbt_version: str = __version__ - generated_at: datetime = dataclasses.field(default_factory=datetime.utcnow) - invocation_id: Optional[str] = dataclasses.field(default_factory=get_invocation_id) - env: Dict[str, str] = dataclasses.field(default_factory=get_metadata_vars) - - def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None): - dct = super().__post_serialize__(dct, context) - if dct["generated_at"] and dct["generated_at"].endswith("+00:00"): - dct["generated_at"] = dct["generated_at"].replace("+00:00", "") + "Z" - return dct - - -# This is used as a class decorator to set the schema_version in the -# 'dbt_schema_version' class attribute. (It's copied into the metadata objects.) -# Name attributes of SchemaVersion in classes with the 'schema_version' decorator: -# manifest -# run-results -# run-operation-result -# sources -# catalog -# remote-compile-result -# remote-execution-result -# remote-run-result -def schema_version(name: str, version: int): - def inner(cls: Type[VersionedSchema]): - cls.dbt_schema_version = SchemaVersion( - name=name, - version=version, - ) - return cls - - return inner - - -# This is used in the ArtifactMixin and RemoteCompileResultMixin classes -@dataclasses.dataclass -class VersionedSchema(dbtClassMixin): - dbt_schema_version: ClassVar[SchemaVersion] - - @classmethod - @functools.lru_cache - def json_schema(cls) -> Dict[str, Any]: - json_schema_obj = build_json_schema(cls, dialect=DRAFT_2020_12, with_dialect_uri=True) - json_schema = json_schema_obj.to_dict() - json_schema["$id"] = str(cls.dbt_schema_version) - return json_schema - - @classmethod - def is_compatible_version(cls, schema_version): - compatible_versions = [str(cls.dbt_schema_version)] - if hasattr(cls, "compatible_previous_versions"): - for name, version in cls.compatible_previous_versions(): - compatible_versions.append(str(SchemaVersion(name, version))) - return str(schema_version) in compatible_versions - - @classmethod - def read_and_check_versions(cls, path: str): - try: - data = read_json(path) - except (EnvironmentError, ValueError) as exc: - raise DbtRuntimeError( - f'Could not read {cls.__name__} at "{path}" as JSON: {exc}' - ) from exc - - # Check metadata version. There is a class variable 'dbt_schema_version', but - # that doesn't show up in artifacts, where it only exists in the 'metadata' - # dictionary. - if hasattr(cls, "dbt_schema_version"): - if "metadata" in data and "dbt_schema_version" in data["metadata"]: - previous_schema_version = data["metadata"]["dbt_schema_version"] - # cls.dbt_schema_version is a SchemaVersion object - if not cls.is_compatible_version(previous_schema_version): - raise IncompatibleSchemaError( - expected=str(cls.dbt_schema_version), - found=previous_schema_version, - ) - - return cls.upgrade_schema_version(data) - - @classmethod - def upgrade_schema_version(cls, data): - """This will modify the data (dictionary) passed in to match the current - artifact schema code, if necessary. This is the default method, which - just returns the instantiated object via from_dict.""" - return cls.from_dict(data) - - -T = TypeVar("T", bound="ArtifactMixin") - - -# metadata should really be a Generic[T_M] where T_M is a TypeVar bound to -# BaseArtifactMetadata. Unfortunately this isn't possible due to a mypy issue: -# https://github.com/python/mypy/issues/7520 -# This is used in the WritableManifest, RunResultsArtifact, RunOperationResultsArtifact, -# and CatalogArtifact -@dataclasses.dataclass(init=False) -class ArtifactMixin(VersionedSchema, Writable, Readable): - metadata: BaseArtifactMetadata - - @classmethod - def validate(cls, data): - super().validate(data) - if cls.dbt_schema_version is None: - raise DbtInternalError("Cannot call from_dict with no schema version!") - - -def get_artifact_schema_version(dct: dict) -> int: - schema_version = dct.get("metadata", {}).get("dbt_schema_version", None) - if not schema_version: - raise ValueError("Artifact is missing schema version") - - # schema_version is in this format: https://schemas.getdbt.com/dbt/manifest/v10.json - # What the code below is doing: - # 1. Split on "/" – v10.json - # 2. Split on "." – v10 - # 3. Skip first character – 10 - # 4. Convert to int - # TODO: If this gets more complicated, turn into a regex - return int(schema_version.split("/")[-1].split(".")[0][1:]) diff --git a/core/dbt/artifacts/schemas/base/__init__.py b/core/dbt/artifacts/schemas/base/__init__.py new file mode 100644 index 00000000000..9098b88eea1 --- /dev/null +++ b/core/dbt/artifacts/schemas/base/__init__.py @@ -0,0 +1 @@ +from dbt_common.artifacts.schemas.base import * # noqa diff --git a/core/dbt/artifacts/schemas/catalog/__init__.py b/core/dbt/artifacts/schemas/catalog/__init__.py index 1123f801adf..320b4edff87 100644 --- a/core/dbt/artifacts/schemas/catalog/__init__.py +++ b/core/dbt/artifacts/schemas/catalog/__init__.py @@ -1,2 +1 @@ -# alias to latest -from dbt.artifacts.schemas.catalog.v1.catalog import * # noqa +from dbt_common.artifacts.schemas.catalog import * # noqa diff --git a/core/dbt/artifacts/schemas/catalog/v1/__init__.py b/core/dbt/artifacts/schemas/catalog/v1/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/core/dbt/artifacts/schemas/catalog/v1/catalog.py b/core/dbt/artifacts/schemas/catalog/v1/catalog.py deleted file mode 100644 index d6d02608bca..00000000000 --- a/core/dbt/artifacts/schemas/catalog/v1/catalog.py +++ /dev/null @@ -1,112 +0,0 @@ -from dataclasses import dataclass, field -from datetime import datetime -from typing import Any, Dict, List, NamedTuple, Optional, Union - -from dbt.artifacts.schemas.base import ( - ArtifactMixin, - BaseArtifactMetadata, - schema_version, -) -from dbt_common.dataclass_schema import dbtClassMixin -from dbt_common.utils.formatting import lowercase - -Primitive = Union[bool, str, float, None] -PrimitiveDict = Dict[str, Primitive] - -CatalogKey = NamedTuple( - "CatalogKey", [("database", Optional[str]), ("schema", str), ("name", str)] -) - - -@dataclass -class StatsItem(dbtClassMixin): - id: str - label: str - value: Primitive - include: bool - description: Optional[str] = None - - -StatsDict = Dict[str, StatsItem] - - -@dataclass -class ColumnMetadata(dbtClassMixin): - type: str - index: int - name: str - comment: Optional[str] = None - - -ColumnMap = Dict[str, ColumnMetadata] - - -@dataclass -class TableMetadata(dbtClassMixin): - type: str - schema: str - name: str - database: Optional[str] = None - comment: Optional[str] = None - owner: Optional[str] = None - - -@dataclass -class CatalogTable(dbtClassMixin): - metadata: TableMetadata - columns: ColumnMap - stats: StatsDict - # the same table with two unique IDs will just be listed two times - unique_id: Optional[str] = None - - def key(self) -> CatalogKey: - return CatalogKey( - lowercase(self.metadata.database), - self.metadata.schema.lower(), - self.metadata.name.lower(), - ) - - -@dataclass -class CatalogMetadata(BaseArtifactMetadata): - dbt_schema_version: str = field( - default_factory=lambda: str(CatalogArtifact.dbt_schema_version) - ) - - -@dataclass -class CatalogResults(dbtClassMixin): - nodes: Dict[str, CatalogTable] - sources: Dict[str, CatalogTable] - errors: Optional[List[str]] = None - _compile_results: Optional[Any] = None - - def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None): - dct = super().__post_serialize__(dct, context) - if "_compile_results" in dct: - del dct["_compile_results"] - return dct - - -@dataclass -@schema_version("catalog", 1) -class CatalogArtifact(CatalogResults, ArtifactMixin): - metadata: CatalogMetadata - - @classmethod - def from_results( - cls, - generated_at: datetime, - nodes: Dict[str, CatalogTable], - sources: Dict[str, CatalogTable], - compile_results: Optional[Any], - errors: Optional[List[str]], - ) -> "CatalogArtifact": - meta = CatalogMetadata(generated_at=generated_at) - return cls( - metadata=meta, - nodes=nodes, - sources=sources, - errors=errors, - _compile_results=compile_results, - ) diff --git a/core/dbt/version.py b/core/dbt/version.py deleted file mode 100644 index a4a219e9529..00000000000 --- a/core/dbt/version.py +++ /dev/null @@ -1,232 +0,0 @@ -import glob -import importlib -import importlib.util -import json -import os -from typing import Iterator, List, Optional, Tuple - -import requests - -import dbt_common.semver as semver -from dbt_common.ui import green, red, yellow - -PYPI_VERSION_URL = "https://pypi.org/pypi/dbt-core/json" - - -def get_version_information() -> str: - installed = get_installed_version() - latest = get_latest_version() - - core_msg_lines, core_info_msg = _get_core_msg_lines(installed, latest) - core_msg = _format_core_msg(core_msg_lines) - plugin_version_msg = _get_plugins_msg(installed) - - msg_lines = [core_msg] - - if core_info_msg != "": - msg_lines.append(core_info_msg) - - msg_lines.append(plugin_version_msg) - msg_lines.append("") - - return "\n\n".join(msg_lines) - - -def get_installed_version() -> semver.VersionSpecifier: - return semver.VersionSpecifier.from_version_string(__version__) - - -def get_latest_version( - version_url: str = PYPI_VERSION_URL, -) -> Optional[semver.VersionSpecifier]: - try: - resp = requests.get(version_url, timeout=1) - data = resp.json() - version_string = data["info"]["version"] - except (json.JSONDecodeError, KeyError, requests.RequestException): - return None - - return semver.VersionSpecifier.from_version_string(version_string) - - -def _get_core_msg_lines(installed, latest) -> Tuple[List[List[str]], str]: - installed_s = installed.to_version_string(skip_matcher=True) - installed_line = ["installed", installed_s, ""] - update_info = "" - - if latest is None: - update_info = ( - " The latest version of dbt-core could not be determined!\n" - " Make sure that the following URL is accessible:\n" - f" {PYPI_VERSION_URL}" - ) - return [installed_line], update_info - - latest_s = latest.to_version_string(skip_matcher=True) - latest_line = ["latest", latest_s, green("Up to date!")] - - if installed > latest: - latest_line[2] = yellow("Ahead of latest version!") - elif installed < latest: - latest_line[2] = yellow("Update available!") - update_info = ( - " Your version of dbt-core is out of date!\n" - " You can find instructions for upgrading here:\n" - " https://docs.getdbt.com/docs/installation" - ) - - return [ - installed_line, - latest_line, - ], update_info - - -def _format_core_msg(lines: List[List[str]]) -> str: - msg = "Core:\n" - msg_lines = [] - - for name, version, update_msg in _pad_lines(lines, seperator=":"): - line_msg = f" - {name} {version}" - if update_msg != "": - line_msg += f" - {update_msg}" - msg_lines.append(line_msg) - - return msg + "\n".join(msg_lines) - - -def _get_plugins_msg(installed: semver.VersionSpecifier) -> str: - msg_lines = ["Plugins:"] - - plugins = [] - display_update_msg = False - for name, version_s in _get_dbt_plugins_info(): - compatability_msg, needs_update = _get_plugin_msg_info(name, version_s, installed) - if needs_update: - display_update_msg = True - plugins.append([name, version_s, compatability_msg]) - - for plugin in _pad_lines(plugins, seperator=":"): - msg_lines.append(_format_single_plugin(plugin, "")) - - if display_update_msg: - update_msg = ( - " At least one plugin is out of date or incompatible with dbt-core.\n" - " You can find instructions for upgrading here:\n" - " https://docs.getdbt.com/docs/installation" - ) - msg_lines += ["", update_msg] - - return "\n".join(msg_lines) - - -def _get_plugin_msg_info( - name: str, version_s: str, core: semver.VersionSpecifier -) -> Tuple[str, bool]: - plugin = semver.VersionSpecifier.from_version_string(version_s) - latest_plugin = get_latest_version(version_url=get_package_pypi_url(name)) - - needs_update = False - - if plugin.major != core.major or plugin.minor != core.minor: - compatibility_msg = red("Not compatible!") - needs_update = True - return (compatibility_msg, needs_update) - - if not latest_plugin: - compatibility_msg = yellow("Could not determine latest version") - return (compatibility_msg, needs_update) - - if plugin < latest_plugin: - compatibility_msg = yellow("Update available!") - needs_update = True - elif plugin > latest_plugin: - compatibility_msg = yellow("Ahead of latest version!") - else: - compatibility_msg = green("Up to date!") - - return (compatibility_msg, needs_update) - - -def _format_single_plugin(plugin: List[str], update_msg: str) -> str: - name, version_s, compatability_msg = plugin - msg = f" - {name} {version_s} - {compatability_msg}" - if update_msg != "": - msg += f"\n{update_msg}\n" - return msg - - -def _pad_lines(lines: List[List[str]], seperator: str = "") -> List[List[str]]: - if len(lines) == 0: - return [] - - # count the max line length for each column in the line - counter = [0] * len(lines[0]) - for line in lines: - for i, item in enumerate(line): - counter[i] = max(counter[i], len(item)) - - result: List[List[str]] = [] - for i, line in enumerate(lines): - # add another list to hold padded strings - if len(result) == i: - result.append([""] * len(line)) - - # iterate over columns in the line - for j, item in enumerate(line): - # the last column does not need padding - if j == len(line) - 1: - result[i][j] = item - continue - - # if the following column has no length - # the string does not need padding - if counter[j + 1] == 0: - result[i][j] = item - continue - - # only add the seperator to the first column - offset = 0 - if j == 0 and seperator != "": - item += seperator - offset = len(seperator) - - result[i][j] = item.ljust(counter[j] + offset) - - return result - - -def get_package_pypi_url(package_name: str) -> str: - return f"https://pypi.org/pypi/dbt-{package_name}/json" - - -def _get_dbt_plugins_info() -> Iterator[Tuple[str, str]]: - for plugin_name in _get_adapter_plugin_names(): - if plugin_name == "core": - continue - try: - mod = importlib.import_module(f"dbt.adapters.{plugin_name}.__version__") - except ImportError: - # not an adapter - continue - yield plugin_name, mod.version # type: ignore - - -def _get_adapter_plugin_names() -> Iterator[str]: - spec = importlib.util.find_spec("dbt.adapters") - # If None, then nothing provides an importable 'dbt.adapters', so we will - # not be reporting plugin versions today - if spec is None or spec.submodule_search_locations is None: - return - - for adapters_path in spec.submodule_search_locations: - version_glob = os.path.join(adapters_path, "*", "__version__.py") - for version_path in glob.glob(version_glob): - # the path is like .../dbt/adapters/{plugin_name}/__version__.py - # except it could be \\ on windows! - plugin_root, _ = os.path.split(version_path) - _, plugin_name = os.path.split(plugin_root) - yield plugin_name - - -__version__ = "1.9.0a1" -installed = get_installed_version() diff --git a/core/dbt/version/__init__.py b/core/dbt/version/__init__.py new file mode 100644 index 00000000000..a1034ae851d --- /dev/null +++ b/core/dbt/version/__init__.py @@ -0,0 +1 @@ +from dbt_common.version import * # noqa From e4ef2429c526ec7b5ecdfb94f3e00d70c29fa7c3 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 29 May 2024 16:38:38 +0100 Subject: [PATCH 02/12] Discard changes to core/dbt/artifacts/exceptions/__init__.py --- core/dbt/artifacts/exceptions/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/dbt/artifacts/exceptions/__init__.py b/core/dbt/artifacts/exceptions/__init__.py index 991cd457dad..ad8d4ae51b7 100644 --- a/core/dbt/artifacts/exceptions/__init__.py +++ b/core/dbt/artifacts/exceptions/__init__.py @@ -1 +1 @@ -from dbt_common.artifacts.exceptions import * # noqa +from dbt.artifacts.exceptions.schemas import IncompatibleSchemaError From 48c42f25cee25746e53694c4c2983290980bde85 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 29 May 2024 16:38:41 +0100 Subject: [PATCH 03/12] Discard changes to core/dbt/artifacts/exceptions/schemas.py --- core/dbt/artifacts/exceptions/schemas.py | 31 ++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 core/dbt/artifacts/exceptions/schemas.py diff --git a/core/dbt/artifacts/exceptions/schemas.py b/core/dbt/artifacts/exceptions/schemas.py new file mode 100644 index 00000000000..c9f1b0e151f --- /dev/null +++ b/core/dbt/artifacts/exceptions/schemas.py @@ -0,0 +1,31 @@ +from typing import Optional + +from dbt_common.exceptions import DbtRuntimeError + + +class IncompatibleSchemaError(DbtRuntimeError): + def __init__(self, expected: str, found: Optional[str] = None) -> None: + self.expected = expected + self.found = found + self.filename = "input file" + + super().__init__(msg=self.get_message()) + + def add_filename(self, filename: str): + self.filename = filename + self.msg = self.get_message() + + def get_message(self) -> str: + found_str = "nothing" + if self.found is not None: + found_str = f'"{self.found}"' + + msg = ( + f'Expected a schema version of "{self.expected}" in ' + f"{self.filename}, but found {found_str}. Are you running with a " + f"different version of dbt?" + ) + return msg + + CODE = 10014 + MESSAGE = "Incompatible Schema" From 282facab88bf02656adc5618bc2dba1eff7f2106 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 29 May 2024 16:38:44 +0100 Subject: [PATCH 04/12] Discard changes to core/dbt/artifacts/schemas/base.py --- core/dbt/artifacts/schemas/base.py | 175 +++++++++++++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 core/dbt/artifacts/schemas/base.py diff --git a/core/dbt/artifacts/schemas/base.py b/core/dbt/artifacts/schemas/base.py new file mode 100644 index 00000000000..c807257a24b --- /dev/null +++ b/core/dbt/artifacts/schemas/base.py @@ -0,0 +1,175 @@ +import dataclasses +import functools +from datetime import datetime +from typing import Any, ClassVar, Dict, Optional, Type, TypeVar + +from mashumaro.jsonschema import build_json_schema +from mashumaro.jsonschema.dialects import DRAFT_2020_12 + +from dbt.artifacts.exceptions import IncompatibleSchemaError +from dbt.version import __version__ +from dbt_common.clients.system import read_json, write_json +from dbt_common.dataclass_schema import dbtClassMixin +from dbt_common.events.functions import get_metadata_vars +from dbt_common.exceptions import DbtInternalError, DbtRuntimeError +from dbt_common.invocation import get_invocation_id + +BASE_SCHEMAS_URL = "https://schemas.getdbt.com/" +SCHEMA_PATH = "dbt/{name}/v{version}.json" + + +@dataclasses.dataclass +class SchemaVersion: + name: str + version: int + + @property + def path(self) -> str: + return SCHEMA_PATH.format(name=self.name, version=self.version) + + def __str__(self) -> str: + return BASE_SCHEMAS_URL + self.path + + +class Writable: + def write(self, path: str): + write_json(path, self.to_dict(omit_none=False, context={"artifact": True})) # type: ignore + + +class Readable: + @classmethod + def read(cls, path: str): + try: + data = read_json(path) + except (EnvironmentError, ValueError) as exc: + raise DbtRuntimeError( + f'Could not read {cls.__name__} at "{path}" as JSON: {exc}' + ) from exc + + return cls.from_dict(data) # type: ignore + + +# This is used in the ManifestMetadata, RunResultsMetadata, RunOperationResultMetadata, +# FreshnessMetadata, and CatalogMetadata classes +@dataclasses.dataclass +class BaseArtifactMetadata(dbtClassMixin): + dbt_schema_version: str + dbt_version: str = __version__ + generated_at: datetime = dataclasses.field(default_factory=datetime.utcnow) + invocation_id: Optional[str] = dataclasses.field(default_factory=get_invocation_id) + env: Dict[str, str] = dataclasses.field(default_factory=get_metadata_vars) + + def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None): + dct = super().__post_serialize__(dct, context) + if dct["generated_at"] and dct["generated_at"].endswith("+00:00"): + dct["generated_at"] = dct["generated_at"].replace("+00:00", "") + "Z" + return dct + + +# This is used as a class decorator to set the schema_version in the +# 'dbt_schema_version' class attribute. (It's copied into the metadata objects.) +# Name attributes of SchemaVersion in classes with the 'schema_version' decorator: +# manifest +# run-results +# run-operation-result +# sources +# catalog +# remote-compile-result +# remote-execution-result +# remote-run-result +def schema_version(name: str, version: int): + def inner(cls: Type[VersionedSchema]): + cls.dbt_schema_version = SchemaVersion( + name=name, + version=version, + ) + return cls + + return inner + + +# This is used in the ArtifactMixin and RemoteCompileResultMixin classes +@dataclasses.dataclass +class VersionedSchema(dbtClassMixin): + dbt_schema_version: ClassVar[SchemaVersion] + + @classmethod + @functools.lru_cache + def json_schema(cls) -> Dict[str, Any]: + json_schema_obj = build_json_schema(cls, dialect=DRAFT_2020_12, with_dialect_uri=True) + json_schema = json_schema_obj.to_dict() + json_schema["$id"] = str(cls.dbt_schema_version) + return json_schema + + @classmethod + def is_compatible_version(cls, schema_version): + compatible_versions = [str(cls.dbt_schema_version)] + if hasattr(cls, "compatible_previous_versions"): + for name, version in cls.compatible_previous_versions(): + compatible_versions.append(str(SchemaVersion(name, version))) + return str(schema_version) in compatible_versions + + @classmethod + def read_and_check_versions(cls, path: str): + try: + data = read_json(path) + except (EnvironmentError, ValueError) as exc: + raise DbtRuntimeError( + f'Could not read {cls.__name__} at "{path}" as JSON: {exc}' + ) from exc + + # Check metadata version. There is a class variable 'dbt_schema_version', but + # that doesn't show up in artifacts, where it only exists in the 'metadata' + # dictionary. + if hasattr(cls, "dbt_schema_version"): + if "metadata" in data and "dbt_schema_version" in data["metadata"]: + previous_schema_version = data["metadata"]["dbt_schema_version"] + # cls.dbt_schema_version is a SchemaVersion object + if not cls.is_compatible_version(previous_schema_version): + raise IncompatibleSchemaError( + expected=str(cls.dbt_schema_version), + found=previous_schema_version, + ) + + return cls.upgrade_schema_version(data) + + @classmethod + def upgrade_schema_version(cls, data): + """This will modify the data (dictionary) passed in to match the current + artifact schema code, if necessary. This is the default method, which + just returns the instantiated object via from_dict.""" + return cls.from_dict(data) + + +T = TypeVar("T", bound="ArtifactMixin") + + +# metadata should really be a Generic[T_M] where T_M is a TypeVar bound to +# BaseArtifactMetadata. Unfortunately this isn't possible due to a mypy issue: +# https://github.com/python/mypy/issues/7520 +# This is used in the WritableManifest, RunResultsArtifact, RunOperationResultsArtifact, +# and CatalogArtifact +@dataclasses.dataclass(init=False) +class ArtifactMixin(VersionedSchema, Writable, Readable): + metadata: BaseArtifactMetadata + + @classmethod + def validate(cls, data): + super().validate(data) + if cls.dbt_schema_version is None: + raise DbtInternalError("Cannot call from_dict with no schema version!") + + +def get_artifact_schema_version(dct: dict) -> int: + schema_version = dct.get("metadata", {}).get("dbt_schema_version", None) + if not schema_version: + raise ValueError("Artifact is missing schema version") + + # schema_version is in this format: https://schemas.getdbt.com/dbt/manifest/v10.json + # What the code below is doing: + # 1. Split on "/" – v10.json + # 2. Split on "." – v10 + # 3. Skip first character – 10 + # 4. Convert to int + # TODO: If this gets more complicated, turn into a regex + return int(schema_version.split("/")[-1].split(".")[0][1:]) From 0fbd25e586d29474c75d18478f20e854d1dae09a Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 29 May 2024 16:38:50 +0100 Subject: [PATCH 05/12] Discard changes to core/dbt/artifacts/schemas/base/__init__.py --- core/dbt/artifacts/schemas/base/__init__.py | 1 - 1 file changed, 1 deletion(-) delete mode 100644 core/dbt/artifacts/schemas/base/__init__.py diff --git a/core/dbt/artifacts/schemas/base/__init__.py b/core/dbt/artifacts/schemas/base/__init__.py deleted file mode 100644 index 9098b88eea1..00000000000 --- a/core/dbt/artifacts/schemas/base/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from dbt_common.artifacts.schemas.base import * # noqa From 029e61643f740dba72c0e46502f0b1bafc28f092 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 29 May 2024 16:39:05 +0100 Subject: [PATCH 06/12] Discard changes to core/dbt/artifacts/schemas/catalog/__init__.py --- core/dbt/artifacts/schemas/catalog/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/dbt/artifacts/schemas/catalog/__init__.py b/core/dbt/artifacts/schemas/catalog/__init__.py index 320b4edff87..1123f801adf 100644 --- a/core/dbt/artifacts/schemas/catalog/__init__.py +++ b/core/dbt/artifacts/schemas/catalog/__init__.py @@ -1 +1,2 @@ -from dbt_common.artifacts.schemas.catalog import * # noqa +# alias to latest +from dbt.artifacts.schemas.catalog.v1.catalog import * # noqa From 774b64a503054619c98d2186d8fb66dc6fa9dd54 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 29 May 2024 16:39:09 +0100 Subject: [PATCH 07/12] Discard changes to core/dbt/artifacts/schemas/catalog/v1/__init__.py --- core/dbt/artifacts/schemas/catalog/v1/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 core/dbt/artifacts/schemas/catalog/v1/__init__.py diff --git a/core/dbt/artifacts/schemas/catalog/v1/__init__.py b/core/dbt/artifacts/schemas/catalog/v1/__init__.py new file mode 100644 index 00000000000..e69de29bb2d From c7f943ed0ec497465ec056ab974da5f32336639c Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 29 May 2024 16:39:13 +0100 Subject: [PATCH 08/12] Discard changes to core/dbt/artifacts/schemas/catalog/v1/catalog.py --- .../artifacts/schemas/catalog/v1/catalog.py | 112 ++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 core/dbt/artifacts/schemas/catalog/v1/catalog.py diff --git a/core/dbt/artifacts/schemas/catalog/v1/catalog.py b/core/dbt/artifacts/schemas/catalog/v1/catalog.py new file mode 100644 index 00000000000..d6d02608bca --- /dev/null +++ b/core/dbt/artifacts/schemas/catalog/v1/catalog.py @@ -0,0 +1,112 @@ +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Dict, List, NamedTuple, Optional, Union + +from dbt.artifacts.schemas.base import ( + ArtifactMixin, + BaseArtifactMetadata, + schema_version, +) +from dbt_common.dataclass_schema import dbtClassMixin +from dbt_common.utils.formatting import lowercase + +Primitive = Union[bool, str, float, None] +PrimitiveDict = Dict[str, Primitive] + +CatalogKey = NamedTuple( + "CatalogKey", [("database", Optional[str]), ("schema", str), ("name", str)] +) + + +@dataclass +class StatsItem(dbtClassMixin): + id: str + label: str + value: Primitive + include: bool + description: Optional[str] = None + + +StatsDict = Dict[str, StatsItem] + + +@dataclass +class ColumnMetadata(dbtClassMixin): + type: str + index: int + name: str + comment: Optional[str] = None + + +ColumnMap = Dict[str, ColumnMetadata] + + +@dataclass +class TableMetadata(dbtClassMixin): + type: str + schema: str + name: str + database: Optional[str] = None + comment: Optional[str] = None + owner: Optional[str] = None + + +@dataclass +class CatalogTable(dbtClassMixin): + metadata: TableMetadata + columns: ColumnMap + stats: StatsDict + # the same table with two unique IDs will just be listed two times + unique_id: Optional[str] = None + + def key(self) -> CatalogKey: + return CatalogKey( + lowercase(self.metadata.database), + self.metadata.schema.lower(), + self.metadata.name.lower(), + ) + + +@dataclass +class CatalogMetadata(BaseArtifactMetadata): + dbt_schema_version: str = field( + default_factory=lambda: str(CatalogArtifact.dbt_schema_version) + ) + + +@dataclass +class CatalogResults(dbtClassMixin): + nodes: Dict[str, CatalogTable] + sources: Dict[str, CatalogTable] + errors: Optional[List[str]] = None + _compile_results: Optional[Any] = None + + def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None): + dct = super().__post_serialize__(dct, context) + if "_compile_results" in dct: + del dct["_compile_results"] + return dct + + +@dataclass +@schema_version("catalog", 1) +class CatalogArtifact(CatalogResults, ArtifactMixin): + metadata: CatalogMetadata + + @classmethod + def from_results( + cls, + generated_at: datetime, + nodes: Dict[str, CatalogTable], + sources: Dict[str, CatalogTable], + compile_results: Optional[Any], + errors: Optional[List[str]], + ) -> "CatalogArtifact": + meta = CatalogMetadata(generated_at=generated_at) + return cls( + metadata=meta, + nodes=nodes, + sources=sources, + errors=errors, + _compile_results=compile_results, + ) From 6fa4fc308208839c50cdbfd0faec88f86901d96c Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 29 May 2024 16:39:17 +0100 Subject: [PATCH 09/12] Discard changes to core/dbt/version.py --- core/dbt/version.py | 232 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 232 insertions(+) create mode 100644 core/dbt/version.py diff --git a/core/dbt/version.py b/core/dbt/version.py new file mode 100644 index 00000000000..a4a219e9529 --- /dev/null +++ b/core/dbt/version.py @@ -0,0 +1,232 @@ +import glob +import importlib +import importlib.util +import json +import os +from typing import Iterator, List, Optional, Tuple + +import requests + +import dbt_common.semver as semver +from dbt_common.ui import green, red, yellow + +PYPI_VERSION_URL = "https://pypi.org/pypi/dbt-core/json" + + +def get_version_information() -> str: + installed = get_installed_version() + latest = get_latest_version() + + core_msg_lines, core_info_msg = _get_core_msg_lines(installed, latest) + core_msg = _format_core_msg(core_msg_lines) + plugin_version_msg = _get_plugins_msg(installed) + + msg_lines = [core_msg] + + if core_info_msg != "": + msg_lines.append(core_info_msg) + + msg_lines.append(plugin_version_msg) + msg_lines.append("") + + return "\n\n".join(msg_lines) + + +def get_installed_version() -> semver.VersionSpecifier: + return semver.VersionSpecifier.from_version_string(__version__) + + +def get_latest_version( + version_url: str = PYPI_VERSION_URL, +) -> Optional[semver.VersionSpecifier]: + try: + resp = requests.get(version_url, timeout=1) + data = resp.json() + version_string = data["info"]["version"] + except (json.JSONDecodeError, KeyError, requests.RequestException): + return None + + return semver.VersionSpecifier.from_version_string(version_string) + + +def _get_core_msg_lines(installed, latest) -> Tuple[List[List[str]], str]: + installed_s = installed.to_version_string(skip_matcher=True) + installed_line = ["installed", installed_s, ""] + update_info = "" + + if latest is None: + update_info = ( + " The latest version of dbt-core could not be determined!\n" + " Make sure that the following URL is accessible:\n" + f" {PYPI_VERSION_URL}" + ) + return [installed_line], update_info + + latest_s = latest.to_version_string(skip_matcher=True) + latest_line = ["latest", latest_s, green("Up to date!")] + + if installed > latest: + latest_line[2] = yellow("Ahead of latest version!") + elif installed < latest: + latest_line[2] = yellow("Update available!") + update_info = ( + " Your version of dbt-core is out of date!\n" + " You can find instructions for upgrading here:\n" + " https://docs.getdbt.com/docs/installation" + ) + + return [ + installed_line, + latest_line, + ], update_info + + +def _format_core_msg(lines: List[List[str]]) -> str: + msg = "Core:\n" + msg_lines = [] + + for name, version, update_msg in _pad_lines(lines, seperator=":"): + line_msg = f" - {name} {version}" + if update_msg != "": + line_msg += f" - {update_msg}" + msg_lines.append(line_msg) + + return msg + "\n".join(msg_lines) + + +def _get_plugins_msg(installed: semver.VersionSpecifier) -> str: + msg_lines = ["Plugins:"] + + plugins = [] + display_update_msg = False + for name, version_s in _get_dbt_plugins_info(): + compatability_msg, needs_update = _get_plugin_msg_info(name, version_s, installed) + if needs_update: + display_update_msg = True + plugins.append([name, version_s, compatability_msg]) + + for plugin in _pad_lines(plugins, seperator=":"): + msg_lines.append(_format_single_plugin(plugin, "")) + + if display_update_msg: + update_msg = ( + " At least one plugin is out of date or incompatible with dbt-core.\n" + " You can find instructions for upgrading here:\n" + " https://docs.getdbt.com/docs/installation" + ) + msg_lines += ["", update_msg] + + return "\n".join(msg_lines) + + +def _get_plugin_msg_info( + name: str, version_s: str, core: semver.VersionSpecifier +) -> Tuple[str, bool]: + plugin = semver.VersionSpecifier.from_version_string(version_s) + latest_plugin = get_latest_version(version_url=get_package_pypi_url(name)) + + needs_update = False + + if plugin.major != core.major or plugin.minor != core.minor: + compatibility_msg = red("Not compatible!") + needs_update = True + return (compatibility_msg, needs_update) + + if not latest_plugin: + compatibility_msg = yellow("Could not determine latest version") + return (compatibility_msg, needs_update) + + if plugin < latest_plugin: + compatibility_msg = yellow("Update available!") + needs_update = True + elif plugin > latest_plugin: + compatibility_msg = yellow("Ahead of latest version!") + else: + compatibility_msg = green("Up to date!") + + return (compatibility_msg, needs_update) + + +def _format_single_plugin(plugin: List[str], update_msg: str) -> str: + name, version_s, compatability_msg = plugin + msg = f" - {name} {version_s} - {compatability_msg}" + if update_msg != "": + msg += f"\n{update_msg}\n" + return msg + + +def _pad_lines(lines: List[List[str]], seperator: str = "") -> List[List[str]]: + if len(lines) == 0: + return [] + + # count the max line length for each column in the line + counter = [0] * len(lines[0]) + for line in lines: + for i, item in enumerate(line): + counter[i] = max(counter[i], len(item)) + + result: List[List[str]] = [] + for i, line in enumerate(lines): + # add another list to hold padded strings + if len(result) == i: + result.append([""] * len(line)) + + # iterate over columns in the line + for j, item in enumerate(line): + # the last column does not need padding + if j == len(line) - 1: + result[i][j] = item + continue + + # if the following column has no length + # the string does not need padding + if counter[j + 1] == 0: + result[i][j] = item + continue + + # only add the seperator to the first column + offset = 0 + if j == 0 and seperator != "": + item += seperator + offset = len(seperator) + + result[i][j] = item.ljust(counter[j] + offset) + + return result + + +def get_package_pypi_url(package_name: str) -> str: + return f"https://pypi.org/pypi/dbt-{package_name}/json" + + +def _get_dbt_plugins_info() -> Iterator[Tuple[str, str]]: + for plugin_name in _get_adapter_plugin_names(): + if plugin_name == "core": + continue + try: + mod = importlib.import_module(f"dbt.adapters.{plugin_name}.__version__") + except ImportError: + # not an adapter + continue + yield plugin_name, mod.version # type: ignore + + +def _get_adapter_plugin_names() -> Iterator[str]: + spec = importlib.util.find_spec("dbt.adapters") + # If None, then nothing provides an importable 'dbt.adapters', so we will + # not be reporting plugin versions today + if spec is None or spec.submodule_search_locations is None: + return + + for adapters_path in spec.submodule_search_locations: + version_glob = os.path.join(adapters_path, "*", "__version__.py") + for version_path in glob.glob(version_glob): + # the path is like .../dbt/adapters/{plugin_name}/__version__.py + # except it could be \\ on windows! + plugin_root, _ = os.path.split(version_path) + _, plugin_name = os.path.split(plugin_root) + yield plugin_name + + +__version__ = "1.9.0a1" +installed = get_installed_version() From 20ef8805bdddd755b19a99ea219dfc9506293e7e Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 29 May 2024 16:47:09 +0100 Subject: [PATCH 10/12] Only remove StatsItem, StatsDict, TableMetadata --- .../dbt/artifacts/schemas/catalog/__init__.py | 1 + .../artifacts/schemas/catalog/v1/catalog.py | 23 +------------------ core/dbt/version/__init__.py | 1 - 3 files changed, 2 insertions(+), 23 deletions(-) delete mode 100644 core/dbt/version/__init__.py diff --git a/core/dbt/artifacts/schemas/catalog/__init__.py b/core/dbt/artifacts/schemas/catalog/__init__.py index 1123f801adf..5a5159611b2 100644 --- a/core/dbt/artifacts/schemas/catalog/__init__.py +++ b/core/dbt/artifacts/schemas/catalog/__init__.py @@ -1,2 +1,3 @@ # alias to latest from dbt.artifacts.schemas.catalog.v1.catalog import * # noqa +from dbt_common.artifacts.catalog import StatsDict, StatsItem, TableMetadata # noqa diff --git a/core/dbt/artifacts/schemas/catalog/v1/catalog.py b/core/dbt/artifacts/schemas/catalog/v1/catalog.py index d6d02608bca..90e387223b7 100644 --- a/core/dbt/artifacts/schemas/catalog/v1/catalog.py +++ b/core/dbt/artifacts/schemas/catalog/v1/catalog.py @@ -7,6 +7,7 @@ BaseArtifactMetadata, schema_version, ) +from dbt_common.artifacts.catalog import StatsDict, TableMetadata from dbt_common.dataclass_schema import dbtClassMixin from dbt_common.utils.formatting import lowercase @@ -18,18 +19,6 @@ ) -@dataclass -class StatsItem(dbtClassMixin): - id: str - label: str - value: Primitive - include: bool - description: Optional[str] = None - - -StatsDict = Dict[str, StatsItem] - - @dataclass class ColumnMetadata(dbtClassMixin): type: str @@ -41,16 +30,6 @@ class ColumnMetadata(dbtClassMixin): ColumnMap = Dict[str, ColumnMetadata] -@dataclass -class TableMetadata(dbtClassMixin): - type: str - schema: str - name: str - database: Optional[str] = None - comment: Optional[str] = None - owner: Optional[str] = None - - @dataclass class CatalogTable(dbtClassMixin): metadata: TableMetadata diff --git a/core/dbt/version/__init__.py b/core/dbt/version/__init__.py deleted file mode 100644 index a1034ae851d..00000000000 --- a/core/dbt/version/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from dbt_common.version import * # noqa From c8a1349d55794852640659c8083cdb196dc2fff8 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 29 May 2024 19:17:10 +0100 Subject: [PATCH 11/12] dbt_common.contracts.metadata --- core/dbt/artifacts/schemas/catalog/__init__.py | 2 +- core/dbt/artifacts/schemas/catalog/v1/catalog.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/dbt/artifacts/schemas/catalog/__init__.py b/core/dbt/artifacts/schemas/catalog/__init__.py index 5a5159611b2..185b4946d25 100644 --- a/core/dbt/artifacts/schemas/catalog/__init__.py +++ b/core/dbt/artifacts/schemas/catalog/__init__.py @@ -1,3 +1,3 @@ # alias to latest from dbt.artifacts.schemas.catalog.v1.catalog import * # noqa -from dbt_common.artifacts.catalog import StatsDict, StatsItem, TableMetadata # noqa +from dbt_common.contracts.metadata import StatsDict, StatsItem, TableMetadata diff --git a/core/dbt/artifacts/schemas/catalog/v1/catalog.py b/core/dbt/artifacts/schemas/catalog/v1/catalog.py index 90e387223b7..ca78e4e09ba 100644 --- a/core/dbt/artifacts/schemas/catalog/v1/catalog.py +++ b/core/dbt/artifacts/schemas/catalog/v1/catalog.py @@ -7,7 +7,7 @@ BaseArtifactMetadata, schema_version, ) -from dbt_common.artifacts.catalog import StatsDict, TableMetadata +from dbt_common.contracts.metadata import StatsDict, TableMetadata from dbt_common.dataclass_schema import dbtClassMixin from dbt_common.utils.formatting import lowercase From cb7b0610df293ff70dc2c14d6a215ce4b2d8b73e Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 29 May 2024 20:54:14 +0100 Subject: [PATCH 12/12] bump dbt-common version --- core/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/setup.py b/core/setup.py index e133b946fa4..f28a47e0814 100644 --- a/core/setup.py +++ b/core/setup.py @@ -71,7 +71,7 @@ "minimal-snowplow-tracker>=0.0.2,<0.1", "dbt-semantic-interfaces>=0.5.1,<0.6", # Minor versions for these are expected to be backwards-compatible - "dbt-common>=1.1.0,<2.0", + "dbt-common>=1.2.0,<2.0", "dbt-adapters>=1.1.1,<2.0", # ---- # Expect compatibility with all new versions of these packages, so lower bounds only.