Skip to content

Commit

Permalink
[CT-1114] remove Cache call from get_columns_in_relation (#451)
Browse files Browse the repository at this point in the history
* init push for change to get_columns_in_relation to fix cache inconsistencies during on_schema_change

* trying to clear mypy issues

* changelog

* add ref to columns before called on by macro
  • Loading branch information
McKnight-42 authored Sep 15, 2022
1 parent 571a6ef commit 60f47d5
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 30 deletions.
8 changes: 8 additions & 0 deletions .changes/unreleased/Fixes-20220914-010520.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Fixes
body: change to get_columns_in_relation to fix cache inconsistencies to fix cache
issues in incremental models causing failure on on_schema_change
time: 2022-09-14T01:05:20.312981-05:00
custom:
Author: McKnight-42
Issue: "447"
PR: "451"
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ exclude: '^tests/.*'

# Force all unspecified python hooks to run python 3.8
default_language_version:
python: python3.8
python: python3

repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
Expand Down
42 changes: 13 additions & 29 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,36 +207,20 @@ def find_table_information_separator(rows: List[dict]) -> int:
return pos

def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]:
cached_relations = self.cache.get_relations(relation.database, relation.schema)
cached_relation = next(
(
cached_relation
for cached_relation in cached_relations
if str(cached_relation) == str(relation)
),
None,
)
columns = []
if cached_relation and cached_relation.information:
columns = self.parse_columns_from_information(cached_relation)
if not columns:
# in open source delta 'show table extended' query output doesnt
# return relation's schema. if columns are empty from cache,
# use get_columns_in_relation spark macro
# which would execute 'describe extended tablename' query
try:
rows: List[agate.Row] = self.execute_macro(
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation}
)
columns = self.parse_describe_extended(relation, rows)
except dbt.exceptions.RuntimeException as e:
# spark would throw error when table doesn't exist, where other
# CDW would just return and empty list, normalizing the behavior here
errmsg = getattr(e, "msg", "")
if "Table or view not found" in errmsg or "NoSuchTableException" in errmsg:
pass
else:
raise e
try:
rows: List[agate.Row] = self.execute_macro(
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation}
)
columns = self.parse_describe_extended(relation, rows)
except dbt.exceptions.RuntimeException as e:
# spark would throw error when table doesn't exist, where other
# CDW would just return and empty list, normalizing the behavior here
errmsg = getattr(e, "msg", "")
if "Table or view not found" in errmsg or "NoSuchTableException" in errmsg:
pass
else:
raise e

# strip hudi metadata columns.
columns = [x for x in columns if x.name not in self.HUDI_METADATA_COLUMNS]
Expand Down

0 comments on commit 60f47d5

Please sign in to comment.