From 4997771e71600ba6a50a003089a9ab2789ae5433 Mon Sep 17 00:00:00 2001 From: Anders Swanson Date: Fri, 29 Sep 2023 17:27:30 -0400 Subject: [PATCH] intitial python impl --- dbt/adapters/databricks/relation.py | 42 ++-- .../databricks/relation_configs/__init__.py | 5 + .../databricks/relation_configs/base.py | 23 +++ .../relation_configs/materialized_view.py | 183 ++++++++++++++++++ .../databricks/relation_configs/policies.py | 4 - dbt/adapters/databricks/utils.py | 25 ++- 6 files changed, 252 insertions(+), 30 deletions(-) create mode 100644 dbt/adapters/databricks/relation_configs/__init__.py create mode 100644 dbt/adapters/databricks/relation_configs/base.py create mode 100644 dbt/adapters/databricks/relation_configs/materialized_view.py diff --git a/dbt/adapters/databricks/relation.py b/dbt/adapters/databricks/relation.py index f26f43499..990b49a60 100644 --- a/dbt/adapters/databricks/relation.py +++ b/dbt/adapters/databricks/relation.py @@ -1,8 +1,9 @@ from dataclasses import dataclass, field from typing import Any, Dict, Optional, Type from dbt.contracts.relation import ( - ComponentName, + ComponentName, RelationType ) + from dbt.adapters.base.relation import BaseRelation, Policy from dbt.adapters.spark.impl import KEY_TABLE_OWNER, KEY_TABLE_STATISTICS from dbt.dataclass_schema import StrEnum @@ -13,36 +14,27 @@ KEY_TABLE_PROVIDER = "Provider" - -@dataclass -class DatabricksQuotePolicy(Policy): - database: bool = True - schema: bool = True - identifier: bool = True - - -@dataclass -class DatabricksIncludePolicy(Policy): - database: bool = True - schema: bool = True - identifier: bool = True - - -class DatabricksRelationType(StrEnum): - Table = "table" - View = "view" - CTE = "cte" - MaterializedView = "materializedview" - External = "external" - StreamingTable = "streamingtable" - - @dataclass(frozen=True, eq=False, repr=False) class DatabricksRelation(BaseRelation): type: Optional[DatabricksRelationType] = None # type: ignore quote_policy: Policy = field(default_factory=lambda: DatabricksQuotePolicy()) include_policy: Policy = field(default_factory=lambda: DatabricksIncludePolicy()) quote_character: str = "`" + relation_configs = { + RelationType.MaterializedView.value: RedshiftMaterializedViewConfig, + } + renameable_relations = frozenset( + { + RelationType.View, + RelationType.Table, + } + ) + replaceable_relations = frozenset( + { + RelationType.View, + RelationType.Table, + } + ) metadata: Optional[Dict[str, Any]] = None diff --git a/dbt/adapters/databricks/relation_configs/__init__.py b/dbt/adapters/databricks/relation_configs/__init__.py new file mode 100644 index 000000000..32463953e --- /dev/null +++ b/dbt/adapters/databricks/relation_configs/__init__.py @@ -0,0 +1,5 @@ + +from dbt.adapters.databricks.relation_configs.policies import ( + DatabricksIncludePolicy, + DatabricksQuotePolicy, +) diff --git a/dbt/adapters/databricks/relation_configs/base.py b/dbt/adapters/databricks/relation_configs/base.py new file mode 100644 index 000000000..e80bb1b42 --- /dev/null +++ b/dbt/adapters/databricks/relation_configs/base.py @@ -0,0 +1,23 @@ +from dataclasses import dataclass + +from dbt.adapters.base.relation import Policy +from dbt.adapters.relation_configs import RelationConfigBase + +from dbt.adapters.databricks.relation_configs.policies import ( + DatabricksIncludePolicy, + DatabricksQuotePolicy, +) + +@dataclass(frozen=True, eq=True, unsafe_hash=True) +class DatabricksRelationConfigBase(RelationConfigBase): + """ + This base class implements a few boilerplate methods and provides some light structure for Databricks relations. + """ + + @classmethod + def include_policy(cls) -> Policy: + return DatabricksIncludePolicy() + + @classmethod + def quote_policy(cls) -> Policy: + return DatabricksQuotePolicy() diff --git a/dbt/adapters/databricks/relation_configs/materialized_view.py b/dbt/adapters/databricks/relation_configs/materialized_view.py new file mode 100644 index 000000000..7aeb0792f --- /dev/null +++ b/dbt/adapters/databricks/relation_configs/materialized_view.py @@ -0,0 +1,183 @@ +from dataclasses import dataclass +from typing import Optional, Set + +import agate +from dbt.adapters.relation_configs import ( + RelationResults, + RelationConfigChange, + RelationConfigValidationMixin, + RelationConfigValidationRule, +) +from dbt.contracts.graph.nodes import ModelNode +from dbt.contracts.relation import ComponentName +from dbt.exceptions import DbtRuntimeError + +from dbt.adapters.databricks.relation_configs.base import DatabricksRelationConfigBase + +from dbt.adapters.databricks.utils import evaluate_bool + + +@dataclass(frozen=True, eq=True, unsafe_hash=True) +class DatabricksMaterializedViewConfig(DatabricksRelationConfigBase, RelationConfigValidationMixin): + """ + This config follow the specs found here: + https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-create-materialized-view.html + + The following parameters are configurable by dbt: + - mv_name: name of the materialized view + - query: the query that defines the view + - partition: + - schedule + + There are currently no non-configurable parameters. + """ + + mv_name: str + schema_name: str + database_name: str + query: str + backup: bool = True + partition: None # to be done + schedule: bool = False + + @property + def path(self) -> str: + return ".".join( + part + for part in [self.database_name, self.schema_name, self.mv_name] + if part is not None + ) + + # can be filled out later + # @property + # def validation_rules(self) -> Set[RelationConfigValidationRule]: + # # sort and dist rules get run by default with the mixin + # return {} + + @classmethod + def from_dict(cls, config_dict) -> "DatabricksMaterializedViewConfig": + kwargs_dict = { + "mv_name": cls._render_part(ComponentName.Identifier, config_dict.get("mv_name")), + "schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")), + "database_name": cls._render_part( + ComponentName.Database, config_dict.get("database_name") + ), + "query": config_dict.get("query"), + "schedule": config_dict.get("schedule"), + } + + materialized_view: "DatabricksMaterializedViewConfig" = super().from_dict(kwargs_dict) # type: ignore + return materialized_view + + @classmethod + def parse_model_node(cls, model_node: ModelNode) -> dict: + config_dict = { + "mv_name": model_node.identifier, + "schema_name": model_node.schema, + "database_name": model_node.database, + } + + autorefresh_value = model_node.config.extra.get("schedule") + if autorefresh_value is not None: + config_dict["schedule"] = evaluate_bool(autorefresh_value) + + if query := model_node.compiled_code: + config_dict.update({"query": query.strip()}) + + if model_node.config.get("partition"): + config_dict.update({"partition": DatabricksPartitionConfig.parse_model_node(model_node)}) + + return config_dict + + @classmethod + def parse_relation_results(cls, relation_results: RelationResults) -> dict: + """ + Translate agate objects from the database into a standard dictionary. + + Args: + relation_results: the description of the materialized view from the database in this format: + + { + "materialized_view": agate.Table( + agate.Row({ + "database": "", + "schema": "", + "table": "", + "diststyle": "", # e.g. EVEN | KEY(column1) | AUTO(ALL) | AUTO(KEY(id)), + "sortkey1": "", + "autorefresh: any("t", "f"), + }) + ), + "query": agate.Table( + agate.Row({"definition": "")} + ), + } + + Additional columns in either value is fine, as long as `sortkey` and `sortstyle` are available. + + Returns: a standard dictionary describing this `DatabricksMaterializedViewConfig` instance + """ + materialized_view: agate.Row = cls._get_first_row( + relation_results.get("materialized_view") + ) + query: agate.Row = cls._get_first_row(relation_results.get("query")) + + config_dict = { + "mv_name": materialized_view.get("table"), + "schema_name": materialized_view.get("schema"), + "database_name": materialized_view.get("database"), + "autorefresh": materialized_view.get("autorefresh"), + "query": cls._parse_query(query.get("definition")), + } + + # the default for materialized views differs from the default for diststyle in general + # only set it if we got a value + if materialized_view.get("diststyle"): + config_dict.update( + {"dist": DatabricksDistConfig.parse_relation_results(materialized_view)} + ) + + # TODO: this only shows the first column in the sort key + if materialized_view.get("sortkey1"): + config_dict.update( + {"sort": DatabricksSortConfig.parse_relation_results(materialized_view)} + ) + + return config_dict + +# @dataclass(frozen=True, eq=True, unsafe_hash=True) +# class DatabricksAutoRefreshConfigChange(RelationConfigChange): +# context: Optional[bool] = None + +# @property +# def requires_full_refresh(self) -> bool: +# return False + + + +@dataclass +class DatabricksMaterializedViewConfigChangeset: + dist: Optional[DatabricksPartitionConfigChange] = None + autorefresh: Optional[DatabricksAutoRefreshConfigChange] = None + + @property + def requires_full_refresh(self) -> bool: + return any( + { + self.autorefresh.requires_full_refresh if self.autorefresh else False, + self.backup.requires_full_refresh if self.backup else False, + self.dist.requires_full_refresh if self.dist else False, + self.sort.requires_full_refresh if self.sort else False, + } + ) + + @property + def has_changes(self) -> bool: + return any( + { + self.backup if self.backup else False, + self.dist if self.dist else False, + self.sort if self.sort else False, + self.autorefresh if self.autorefresh else False, + } + ) diff --git a/dbt/adapters/databricks/relation_configs/policies.py b/dbt/adapters/databricks/relation_configs/policies.py index 0b4a4769e..e6405590a 100644 --- a/dbt/adapters/databricks/relation_configs/policies.py +++ b/dbt/adapters/databricks/relation_configs/policies.py @@ -3,10 +3,6 @@ from dbt.adapters.base.relation import Policy from dbt.dataclass_schema import StrEnum - -MAX_CHARACTERS_IN_IDENTIFIER = 127 - - class DatabricksRelationType(StrEnum): Table = "table" View = "view" diff --git a/dbt/adapters/databricks/utils.py b/dbt/adapters/databricks/utils.py index 9fbbc411c..26dc84af1 100644 --- a/dbt/adapters/databricks/utils.py +++ b/dbt/adapters/databricks/utils.py @@ -1,7 +1,7 @@ import functools import inspect import re -from typing import Any, Callable, Type, TypeVar +from typing import Any, Callable, Type, TypeVar, Union from dbt.adapters.base import BaseAdapter from jinja2.runtime import Undefined @@ -77,3 +77,26 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: def remove_ansi(line: str) -> str: ansi_escape = re.compile(r"(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]") return ansi_escape.sub("", line) + +def evaluate_bool_str(value: str) -> bool: + value = value.strip().lower() + if value == "true": + return True + elif value == "false": + return False + else: + raise ValueError(f"Invalid boolean string value: {value}") + + +def evaluate_bool(value: Union[str, bool]) -> bool: + if not value: + return False + if isinstance(value, bool): + return value + elif isinstance(value, str): + return evaluate_bool_str(value) + else: + raise TypeError( + f"Invalid type for boolean evaluation, " + f"expecting boolean or str, recieved: {type(value)}" + )