Skip to content

Commit

Permalink
feat: store SparkColumns in the SparkRelation, add needs_information …
Browse files Browse the repository at this point in the history
…to get_relation
  • Loading branch information
TalkWIthKeyboard committed May 21, 2022
1 parent 101c6dd commit d29bd47
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 25 deletions.
35 changes: 12 additions & 23 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,20 @@ def list_relations_without_caching(
return relations

def get_relation(
self, database: Optional[str], schema: str, identifier: str
self,
database: Optional[str],
schema: str,
identifier: str,
needs_information=False
) -> Optional[BaseRelation]:
if not self.Relation.include_policy.database:
database = None

cached = super().get_relation(database, schema, identifier)

if not needs_information:
return cached

return self._set_relation_information(cached) if cached else None

def parse_describe_extended(
Expand Down Expand Up @@ -271,7 +279,7 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]:
else:
updated_relation = self._set_relation_information(cached_relation)

return self._get_spark_columns(updated_relation)
return updated_relation.columns

def _get_updated_relation(self, relation: BaseRelation) -> Optional[SparkRelation]:
metadata = None
Expand Down Expand Up @@ -308,7 +316,7 @@ def _get_updated_relation(self, relation: BaseRelation) -> Optional[SparkRelatio
is_hudi=(provider == 'hudi'),
owner=metadata.get(KEY_TABLE_OWNER),
stats=metadata.get(KEY_TABLE_STATISTICS),
columns={x.column: x.dtype for x in columns}
columns=columns
)

def _set_relation_information(self, relation: SparkRelation) -> SparkRelation:
Expand All @@ -321,31 +329,12 @@ def _set_relation_information(self, relation: SparkRelation) -> SparkRelation:
self.cache.update_relation(updated_relation)
return updated_relation

@staticmethod
def _get_spark_columns(
relation: Optional[SparkRelation]
) -> List[SparkColumn]:
if not relation:
return []

return [SparkColumn(
table_database=None,
table_schema=relation.schema,
table_name=relation.name,
table_type=relation.type,
table_owner=relation.owner,
table_stats=relation.stats,
column=name,
column_index=idx,
dtype=dtype
) for idx, (name, dtype) in enumerate(relation.columns.items())]

def _get_columns_for_catalog(
self, relation: SparkRelation
) -> Iterable[Dict[str, Any]]:
updated_relation = self._set_relation_information(relation)

for column in self._get_spark_columns(updated_relation):
for column in updated_relation.columns:
# convert SparkColumns into catalog dicts
as_dict = column.to_column_dict()
as_dict["column_name"] = as_dict.pop("column", None)
Expand Down
4 changes: 3 additions & 1 deletion dbt/adapters/spark/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from dbt.adapters.base.relation import BaseRelation, Policy
from dbt.exceptions import RuntimeException

from dbt.adapters.spark.column import SparkColumn


@dataclass
class SparkQuotePolicy(Policy):
Expand All @@ -29,7 +31,7 @@ class SparkRelation(BaseRelation):
is_hudi: Optional[bool] = None
owner: Optional[str] = None
stats: Optional[str] = None
columns: Dict[str, str] = field(default_factory=lambda: {})
columns: List[SparkColumn] = field(default_factory=lambda: [])

def __post_init__(self):
if self.database != self.schema and self.database:
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/spark/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{%- set identifier = model['alias'] -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier, needs_information=True) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database,
Expand Down

0 comments on commit d29bd47

Please sign in to comment.