Skip to content

Commit

Permalink
mostly fleshed out
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db committed Dec 18, 2023
1 parent b111ad9 commit 5e9fd83
Show file tree
Hide file tree
Showing 22 changed files with 896 additions and 141 deletions.
27 changes: 26 additions & 1 deletion dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability
from dbt.adapters.spark.impl import (
SparkAdapter,
DESCRIBE_TABLE_EXTENDED_MACRO_NAME,
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME,
KEY_TABLE_OWNER,
KEY_TABLE_STATISTICS,
Expand Down Expand Up @@ -55,6 +56,7 @@
)
from dbt.adapters.databricks.relation_configs.materialized_view import MaterializedViewConfig
from dbt.adapters.databricks.utils import redact_credentials, undefined_proof
from dbt.adapters.relation_configs.config_base import RelationResults


logger = AdapterLogger("Databricks")
Expand Down Expand Up @@ -697,5 +699,28 @@ def get_persist_doc_columns(
return return_columns

@available.parse_none
def materialized_view_from_model(self, model: ModelNode) -> MaterializedViewConfig:
def materialized_view_config_from_model(self, model: ModelNode) -> MaterializedViewConfig:
return MaterializedViewConfig.from_model_node(model) # type: ignore

@available.parse_none
def get_relation_config(self, relation: DatabricksRelation) -> MaterializedViewConfig:
if relation.type == RelationType.MaterializedView:
results = self.describe_materialized_view(relation)
return MaterializedViewConfig.from_results(results)
else:
raise dbt.exceptions.DbtRuntimeError(
f"The method `DatabricksAdapter.get_relation_config` is not implemented "
f"for the relation type: {relation.type}"
)

def describe_materialized_view(self, relation: DatabricksRelation) -> RelationResults:
kwargs = {"relation": relation}
results: RelationResults = dict()
results["describe_extended"] = self.execute_macro(
DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs=kwargs
)
results["information_schema.views"] = self.execute_macro(
"get_view_description", kwargs=kwargs
)
results["show_tblproperties"] = self.execute_macro("fetch_tbl_properties", kwargs=kwargs)
return results
16 changes: 16 additions & 0 deletions dbt/adapters/databricks/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dbt.adapters.databricks.utils import remove_undefined
from dbt.utils import filter_null_values, classproperty
from dbt.exceptions import DbtRuntimeError
from dbt.adapters.base.relation import RelationType

KEY_TABLE_PROVIDER = "Provider"

Expand Down Expand Up @@ -65,6 +66,21 @@ def __pre_deserialize__(cls, data: Dict[Any, Any]) -> Dict[Any, Any]:
data["path"]["database"] = remove_undefined(data["path"]["database"])
return data

renameable_relations = frozenset(
{
RelationType.View,
RelationType.Table,
}
)

# list relations that can be atomically replaced (e.g. `CREATE OR REPLACE my_relation..` versus `DROP` and `CREATE`)
replaceable_relations = frozenset(
{
RelationType.View,
RelationType.Table,
}
)

def has_information(self) -> bool:
return self.metadata is not None

Expand Down
77 changes: 63 additions & 14 deletions dbt/adapters/databricks/relation_configs/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import ClassVar, Dict, Generic, List, TypeVar
from typing import ClassVar, Dict, Generic, List, Optional, TypeVar
from typing_extensions import Self

from dbt.adapters.relation_configs.config_base import RelationConfigBase, RelationResults
from dbt.contracts.graph.nodes import ModelNode
from dbt.exceptions import DbtRuntimeError

from dbt.adapters.relation_configs.config_change import RelationConfigChange
from dbt.adapters.relation_configs.config_change import (
RelationConfigChange,
RelationConfigChangeAction,
)


@dataclass(frozen=True, eq=True, unsafe_hash=True)
Expand All @@ -14,10 +19,60 @@ class DatabricksComponentConfig(ABC):
def to_sql_clause(self) -> str:
raise NotImplementedError("Must be implemented by subclass")

def get_diff(self, other: "DatabricksComponentConfig") -> Self:
if not isinstance(other, self.__class__):
raise DbtRuntimeError(
f"Cannot diff {self.__class__.__name__} with {other.__class__.__name__}"
)
return self


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class DatabricksAlterableComponentConfig(DatabricksComponentConfig, ABC):
@abstractmethod
def to_alter_sql_clauses(self) -> List[str]:
raise NotImplementedError("Must be implemented by subclass")


Component = TypeVar("Component", bound=DatabricksComponentConfig)


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class DatabricksConfigChange(RelationConfigChange, Generic[Component]):
context: Component

@property
def requires_full_refresh(self) -> bool:
return not isinstance(self.context, DatabricksAlterableComponentConfig)

@classmethod
def get_change(cls, new: Component, existing: Component) -> Optional[Self]:
if new != existing:
return cls(RelationConfigChangeAction.alter, new.get_diff(existing))
return None


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class DatabricksRelationChangeSet:
changes: List[DatabricksConfigChange]

@property
def requires_full_refresh(self) -> bool:
return any(change.requires_full_refresh for change in self.changes)

@property
def has_changes(self) -> bool:
return len(self.changes) > 0

def get_alter_sql_clauses(self) -> List[str]:
assert (
not self.requires_full_refresh
), "Cannot alter a relation when changes requires a full refresh"
return [
clause for change in self.changes for clause in change.context.to_alter_sql_clauses()
]


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class DatabricksComponentProcessor(ABC, Generic[Component]):
name: ClassVar[str]
Expand All @@ -33,6 +88,9 @@ def from_model_node(cls, model_node: ModelNode) -> Component:
raise NotImplementedError("Must be implemented by subclass")


T = TypeVar("T", bound=DatabricksComponentConfig)


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class DatabricksRelationConfigBase(RelationConfigBase):
config_components: ClassVar[List[type[DatabricksComponentProcessor]]]
Expand All @@ -53,15 +111,6 @@ def from_results(cls, results: RelationResults) -> RelationConfigBase:

return cls.from_dict(config_dict)


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class DatabricksRelationChangeSet:
changes: List[RelationConfigChange]

@property
def requires_full_refresh(self) -> bool:
return any(change.requires_full_refresh for change in self.changes)

@property
def has_changes(self) -> bool:
return len(self.changes) > 0
@abstractmethod
def get_changeset(self, existing: Self) -> Optional[DatabricksRelationChangeSet]:
raise NotImplementedError("Must be implemented by subclass")
10 changes: 0 additions & 10 deletions dbt/adapters/databricks/relation_configs/comment.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
DatabricksComponentProcessor,
)
from dbt.adapters.relation_configs.config_base import RelationResults
from dbt.adapters.relation_configs.config_change import RelationConfigChange


@dataclass(frozen=True, eq=True, unsafe_hash=True)
Expand All @@ -34,12 +33,3 @@ def from_results(cls, results: RelationResults) -> CommentConfig:
@classmethod
def from_model_node(cls, model_node: ModelNode) -> CommentConfig:
return CommentConfig(model_node.description)


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class CommentConfigChange(RelationConfigChange):
context: Optional[CommentConfig] = None

@property
def requires_full_refresh(self) -> bool:
return False
37 changes: 36 additions & 1 deletion dbt/adapters/databricks/relation_configs/materialized_view.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from dataclasses import dataclass
from typing import List, Optional

from dbt.exceptions import DbtRuntimeError
from dbt.adapters.databricks.relation_configs.base import (
DatabricksConfigChange,
DatabricksRelationChangeSet,
DatabricksRelationConfigBase,
)
from dbt.adapters.databricks.relation_configs.comment import (
Expand All @@ -18,13 +23,43 @@
RefreshConfig,
RefreshProcessor,
)
from dbt.adapters.databricks.relation_configs.tblproperties import (
TblPropertiesConfig,
TblPropertiesProcessor,
)


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class MaterializedViewConfig(DatabricksRelationConfigBase):
config_components = [PartitionedByProcessor, CommentProcessor, RefreshProcessor, QueryProcessor]
config_components = [
PartitionedByProcessor,
CommentProcessor,
TblPropertiesProcessor,
RefreshProcessor,
QueryProcessor,
]

partition_by: PartitionedByConfig
comment: CommentConfig
tblproperties: TblPropertiesConfig
refresh: RefreshConfig
query: QueryConfig

def get_changeset(
self, existing: DatabricksRelationConfigBase
) -> Optional[DatabricksRelationChangeSet]:
if not isinstance(existing, MaterializedViewConfig):
raise DbtRuntimeError(
f"Invalid comparison between MaterializedViewConfig and {type(existing)}"
)

changes: List[Optional[DatabricksConfigChange]] = []
changes.append(DatabricksConfigChange.get_change(existing.partition_by, self.partition_by))
changes.append(DatabricksConfigChange.get_change(existing.comment, self.comment))
changes.append(DatabricksConfigChange.get_change(existing.refresh, self.refresh))
changes.append(DatabricksConfigChange.get_change(existing.query, self.query))

trimmed_changes = [change for change in changes if change]
if trimmed_changes:
return DatabricksRelationChangeSet(trimmed_changes)
return None
12 changes: 1 addition & 11 deletions dbt/adapters/databricks/relation_configs/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import ClassVar, List, Optional

from dbt.adapters.relation_configs.config_base import RelationResults
from dbt.adapters.relation_configs.config_change import RelationConfigChange
from dbt.contracts.graph.nodes import ModelNode
from dbt.adapters.databricks.relation_configs.base import (
DatabricksComponentConfig,
Expand All @@ -22,7 +21,7 @@ def to_sql_clause(self) -> str:


class PartitionedByProcessor(DatabricksComponentProcessor):
name: ClassVar[str] = "partitioned_by"
name: ClassVar[str] = "partition_by"

@classmethod
def from_results(cls, results: RelationResults) -> PartitionedByConfig:
Expand All @@ -44,12 +43,3 @@ def from_model_node(cls, model_node: ModelNode) -> PartitionedByConfig:
if isinstance(partition_by, str):
return PartitionedByConfig([partition_by])
return PartitionedByConfig(partition_by)


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class PartitionedByConfigChange(RelationConfigChange):
context: Optional[PartitionedByConfig] = None

@property
def requires_full_refresh(self) -> bool:
return True
14 changes: 3 additions & 11 deletions dbt/adapters/databricks/relation_configs/query.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from dataclasses import dataclass
from typing import Optional
from typing import ClassVar
from dbt.adapters.relation_configs.config_base import RelationResults
from dbt.contracts.graph.nodes import ModelNode
from dbt.adapters.databricks.relation_configs.base import (
DatabricksComponentConfig,
DatabricksComponentProcessor,
)
from dbt.adapters.relation_configs.config_change import RelationConfigChange
from dbt.exceptions import DbtRuntimeError


Expand All @@ -19,6 +18,8 @@ def to_sql_clause(self) -> str:


class QueryProcessor(DatabricksComponentProcessor[QueryConfig]):
name: ClassVar[str] = "query"

@classmethod
def from_results(cls, result: RelationResults) -> QueryConfig:
row = result["information_schema.views"]
Expand All @@ -32,12 +33,3 @@ def from_model_node(cls, model_node: ModelNode) -> QueryConfig:
return QueryConfig(query.strip())
else:
raise DbtRuntimeError(f"Cannot compile model {model_node.unique_id} with no SQL query")


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class QueryConfigChange(RelationConfigChange):
context: Optional[QueryConfig] = None

@property
def requires_full_refresh(self) -> bool:
return True
Loading

0 comments on commit 5e9fd83

Please sign in to comment.