Skip to content

Commit

Permalink
intitial python impl
Browse files Browse the repository at this point in the history
  • Loading branch information
dataders committed Sep 29, 2023
1 parent ddce57f commit 4997771
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 30 deletions.
42 changes: 17 additions & 25 deletions dbt/adapters/databricks/relation.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Type
from dbt.contracts.relation import (
ComponentName,
ComponentName, RelationType
)

from dbt.adapters.base.relation import BaseRelation, Policy
from dbt.adapters.spark.impl import KEY_TABLE_OWNER, KEY_TABLE_STATISTICS
from dbt.dataclass_schema import StrEnum
Expand All @@ -13,36 +14,27 @@

KEY_TABLE_PROVIDER = "Provider"


@dataclass
class DatabricksQuotePolicy(Policy):
database: bool = True
schema: bool = True
identifier: bool = True


@dataclass
class DatabricksIncludePolicy(Policy):
database: bool = True
schema: bool = True
identifier: bool = True


class DatabricksRelationType(StrEnum):
Table = "table"
View = "view"
CTE = "cte"
MaterializedView = "materializedview"
External = "external"
StreamingTable = "streamingtable"


@dataclass(frozen=True, eq=False, repr=False)
class DatabricksRelation(BaseRelation):
type: Optional[DatabricksRelationType] = None # type: ignore
quote_policy: Policy = field(default_factory=lambda: DatabricksQuotePolicy())
include_policy: Policy = field(default_factory=lambda: DatabricksIncludePolicy())
quote_character: str = "`"
relation_configs = {
RelationType.MaterializedView.value: RedshiftMaterializedViewConfig,
}
renameable_relations = frozenset(
{
RelationType.View,
RelationType.Table,
}
)
replaceable_relations = frozenset(
{
RelationType.View,
RelationType.Table,
}
)

metadata: Optional[Dict[str, Any]] = None

Expand Down
5 changes: 5 additions & 0 deletions dbt/adapters/databricks/relation_configs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

from dbt.adapters.databricks.relation_configs.policies import (
DatabricksIncludePolicy,
DatabricksQuotePolicy,
)
23 changes: 23 additions & 0 deletions dbt/adapters/databricks/relation_configs/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from dataclasses import dataclass

from dbt.adapters.base.relation import Policy
from dbt.adapters.relation_configs import RelationConfigBase

from dbt.adapters.databricks.relation_configs.policies import (
DatabricksIncludePolicy,
DatabricksQuotePolicy,
)

@dataclass(frozen=True, eq=True, unsafe_hash=True)
class DatabricksRelationConfigBase(RelationConfigBase):
"""
This base class implements a few boilerplate methods and provides some light structure for Databricks relations.
"""

@classmethod
def include_policy(cls) -> Policy:
return DatabricksIncludePolicy()

@classmethod
def quote_policy(cls) -> Policy:
return DatabricksQuotePolicy()
183 changes: 183 additions & 0 deletions dbt/adapters/databricks/relation_configs/materialized_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
from dataclasses import dataclass
from typing import Optional, Set

import agate
from dbt.adapters.relation_configs import (
RelationResults,
RelationConfigChange,
RelationConfigValidationMixin,
RelationConfigValidationRule,
)
from dbt.contracts.graph.nodes import ModelNode
from dbt.contracts.relation import ComponentName
from dbt.exceptions import DbtRuntimeError

from dbt.adapters.databricks.relation_configs.base import DatabricksRelationConfigBase

from dbt.adapters.databricks.utils import evaluate_bool


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class DatabricksMaterializedViewConfig(DatabricksRelationConfigBase, RelationConfigValidationMixin):
"""
This config follow the specs found here:
https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-create-materialized-view.html
The following parameters are configurable by dbt:
- mv_name: name of the materialized view
- query: the query that defines the view
- partition:
- schedule
There are currently no non-configurable parameters.
"""

mv_name: str
schema_name: str
database_name: str
query: str
backup: bool = True
partition: None # to be done
schedule: bool = False

@property
def path(self) -> str:
return ".".join(
part
for part in [self.database_name, self.schema_name, self.mv_name]
if part is not None
)

# can be filled out later
# @property
# def validation_rules(self) -> Set[RelationConfigValidationRule]:
# # sort and dist rules get run by default with the mixin
# return {}

@classmethod
def from_dict(cls, config_dict) -> "DatabricksMaterializedViewConfig":
kwargs_dict = {
"mv_name": cls._render_part(ComponentName.Identifier, config_dict.get("mv_name")),
"schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")),
"database_name": cls._render_part(
ComponentName.Database, config_dict.get("database_name")
),
"query": config_dict.get("query"),
"schedule": config_dict.get("schedule"),
}

materialized_view: "DatabricksMaterializedViewConfig" = super().from_dict(kwargs_dict) # type: ignore
return materialized_view

@classmethod
def parse_model_node(cls, model_node: ModelNode) -> dict:
config_dict = {
"mv_name": model_node.identifier,
"schema_name": model_node.schema,
"database_name": model_node.database,
}

autorefresh_value = model_node.config.extra.get("schedule")
if autorefresh_value is not None:
config_dict["schedule"] = evaluate_bool(autorefresh_value)

if query := model_node.compiled_code:
config_dict.update({"query": query.strip()})

if model_node.config.get("partition"):
config_dict.update({"partition": DatabricksPartitionConfig.parse_model_node(model_node)})

return config_dict

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> dict:
"""
Translate agate objects from the database into a standard dictionary.
Args:
relation_results: the description of the materialized view from the database in this format:
{
"materialized_view": agate.Table(
agate.Row({
"database": "<database_name>",
"schema": "<schema_name>",
"table": "<name>",
"diststyle": "<diststyle/distkey>", # e.g. EVEN | KEY(column1) | AUTO(ALL) | AUTO(KEY(id)),
"sortkey1": "<column_name>",
"autorefresh: any("t", "f"),
})
),
"query": agate.Table(
agate.Row({"definition": "<query>")}
),
}
Additional columns in either value is fine, as long as `sortkey` and `sortstyle` are available.
Returns: a standard dictionary describing this `DatabricksMaterializedViewConfig` instance
"""
materialized_view: agate.Row = cls._get_first_row(
relation_results.get("materialized_view")
)
query: agate.Row = cls._get_first_row(relation_results.get("query"))

config_dict = {
"mv_name": materialized_view.get("table"),
"schema_name": materialized_view.get("schema"),
"database_name": materialized_view.get("database"),
"autorefresh": materialized_view.get("autorefresh"),
"query": cls._parse_query(query.get("definition")),
}

# the default for materialized views differs from the default for diststyle in general
# only set it if we got a value
if materialized_view.get("diststyle"):
config_dict.update(
{"dist": DatabricksDistConfig.parse_relation_results(materialized_view)}
)

# TODO: this only shows the first column in the sort key
if materialized_view.get("sortkey1"):
config_dict.update(
{"sort": DatabricksSortConfig.parse_relation_results(materialized_view)}
)

return config_dict

# @dataclass(frozen=True, eq=True, unsafe_hash=True)
# class DatabricksAutoRefreshConfigChange(RelationConfigChange):
# context: Optional[bool] = None

# @property
# def requires_full_refresh(self) -> bool:
# return False



@dataclass
class DatabricksMaterializedViewConfigChangeset:
dist: Optional[DatabricksPartitionConfigChange] = None
autorefresh: Optional[DatabricksAutoRefreshConfigChange] = None

@property
def requires_full_refresh(self) -> bool:
return any(
{
self.autorefresh.requires_full_refresh if self.autorefresh else False,
self.backup.requires_full_refresh if self.backup else False,
self.dist.requires_full_refresh if self.dist else False,
self.sort.requires_full_refresh if self.sort else False,
}
)

@property
def has_changes(self) -> bool:
return any(
{
self.backup if self.backup else False,
self.dist if self.dist else False,
self.sort if self.sort else False,
self.autorefresh if self.autorefresh else False,
}
)
4 changes: 0 additions & 4 deletions dbt/adapters/databricks/relation_configs/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
from dbt.adapters.base.relation import Policy
from dbt.dataclass_schema import StrEnum


MAX_CHARACTERS_IN_IDENTIFIER = 127


class DatabricksRelationType(StrEnum):
Table = "table"
View = "view"
Expand Down
25 changes: 24 additions & 1 deletion dbt/adapters/databricks/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import functools
import inspect
import re
from typing import Any, Callable, Type, TypeVar
from typing import Any, Callable, Type, TypeVar, Union

from dbt.adapters.base import BaseAdapter
from jinja2.runtime import Undefined
Expand Down Expand Up @@ -77,3 +77,26 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
def remove_ansi(line: str) -> str:
ansi_escape = re.compile(r"(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]")
return ansi_escape.sub("", line)

def evaluate_bool_str(value: str) -> bool:
value = value.strip().lower()
if value == "true":
return True
elif value == "false":
return False
else:
raise ValueError(f"Invalid boolean string value: {value}")


def evaluate_bool(value: Union[str, bool]) -> bool:
if not value:
return False
if isinstance(value, bool):
return value
elif isinstance(value, str):
return evaluate_bool_str(value)
else:
raise TypeError(
f"Invalid type for boolean evaluation, "
f"expecting boolean or str, recieved: {type(value)}"
)

0 comments on commit 4997771

Please sign in to comment.