Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize the performance of list_relations_without_caching #342

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c67c5a3
option 2: show tables + show views
jtcohen6 Mar 9, 2022
ecc1847
simplify
jtcohen6 Mar 11, 2022
4a3fa57
Updating requirements to point to release branch
leahwicz Apr 12, 2022
6862c7f
Backport table not exist, Bumping version to 1.1.0rc1 (#333)
ChenyuLInx Apr 13, 2022
ac50f23
Bumping version to 1.1.0 (#341)
github-actions[bot] Apr 28, 2022
3049247
Merge commit 'ac50f231cae3fa61795259eef0204a613fd68f85' into jerco/wi…
TalkWIthKeyboard Apr 29, 2022
bf5a1d6
fix: the tbl name in the result of show tables/views
TalkWIthKeyboard Apr 29, 2022
ba1c87b
fix: select existing ones from namespace and database
TalkWIthKeyboard May 2, 2022
7b92fab
Merge branch 'main' into talkwithkeyboard/faster-caching
TalkWIthKeyboard May 2, 2022
7a5428e
chore: upgrade the spark version in the integration to 3.2.1
TalkWIthKeyboard May 4, 2022
3301123
fix: the wrong use of agate.Table
TalkWIthKeyboard May 7, 2022
1fd6671
get extended information and update cached relation when need it
TalkWIthKeyboard May 14, 2022
969e0a2
Merge branch 'main' into talkwithkeyboard/faster-caching
TalkWIthKeyboard May 14, 2022
e830b24
polish
TalkWIthKeyboard May 14, 2022
2672057
fix NPE while get_columns_in_relation
TalkWIthKeyboard May 14, 2022
9fc5732
add more debug message
TalkWIthKeyboard May 15, 2022
6cd6f6d
fix NPE while get relation
TalkWIthKeyboard May 15, 2022
345239b
fix NPE while get_columns_in_relation
TalkWIthKeyboard May 15, 2022
101c6dd
polish
TalkWIthKeyboard May 15, 2022
d29bd47
feat: store SparkColumns in the SparkRelation, add needs_information …
TalkWIthKeyboard May 21, 2022
24e223c
Merge branch 'main' into talkwithkeyboard/faster-caching
TalkWIthKeyboard May 21, 2022
d2357b7
support dispatch pattern for list marco
TalkWIthKeyboard May 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbt/adapters/spark/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
from typing import Any, Dict, Optional, TypeVar, Union

from dbt.adapters.base.column import Column
from dbt.dataclass_schema import dbtClassMixin
from dbt.contracts.relation import FakeAPIObject
from hologram import JsonDict

Self = TypeVar("Self", bound="SparkColumn")


@dataclass
class SparkColumn(dbtClassMixin, Column):
class SparkColumn(FakeAPIObject, Column):
table_database: Optional[str] = None
table_schema: Optional[str] = None
table_name: Optional[str] = None
Expand Down
227 changes: 147 additions & 80 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import re
from concurrent.futures import Future
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Union
from typing import Any, Dict, Iterable, List, Optional, Union, Tuple
from typing_extensions import TypeAlias

import agate
from dbt.adapters.cache import RelationsCache, _CachedRelation
from dbt.contracts.relation import RelationType
from dbt.events.base_types import DebugLevel, Cache
from dbt.adapters.reference_keys import _ReferenceKey, _make_key
from dbt.events.functions import fire_event

import dbt
import dbt.exceptions
Expand All @@ -25,12 +29,14 @@

GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME = "spark__get_columns_in_relation_raw"
LIST_SCHEMAS_MACRO_NAME = "list_schemas"
LIST_RELATIONS_MACRO_NAME = "list_relations_without_caching"
LIST_TABLES_MACRO_NAME = "list_tables_without_caching"
LIST_VIEWS_MACRO_NAME = "list_views_without_caching"
DROP_RELATION_MACRO_NAME = "drop_relation"
FETCH_TBL_PROPERTIES_MACRO_NAME = "fetch_tbl_properties"

KEY_TABLE_OWNER = "Owner"
KEY_TABLE_STATISTICS = "Statistics"
KEY_TABLE_PROVIDER = "Provider"


@dataclass
Expand All @@ -44,6 +50,46 @@ class SparkConfig(AdapterConfig):
merge_update_columns: Optional[str] = None


@dataclass
class UpdateRelation(DebugLevel, Cache):
relation: _ReferenceKey
code: str = "E038"

def message(self) -> str:
return f"Updating relation: {str(self.relation)}"


@dataclass
class UpdateMissingRelation(DebugLevel, Cache):
relation: _ReferenceKey
code: str = "E039"

def message(self) -> str:
return f"updated a nonexistent relationship: {str(self.relation)}"


class SparkRelationsCache(RelationsCache):
def _update(self, relation: _CachedRelation):
key = relation.key()

if key not in self.relations:
fire_event(UpdateMissingRelation(relation=key))
return

self.relations[key].inner = relation.inner

def update_relation(self, relation):
"""Update the relation inner to the cache

: param BaseRelation relation: The underlying relation.
"""
cached = _CachedRelation(relation)
fire_event(UpdateRelation(relation=_make_key(cached)))

with self.lock:
self._update(cached)


class SparkAdapter(SQLAdapter):
COLUMN_NAMES = (
"table_database",
Expand Down Expand Up @@ -81,6 +127,10 @@ class SparkAdapter(SQLAdapter):
ConnectionManager: TypeAlias = SparkConnectionManager
AdapterSpecificConfigs: TypeAlias = SparkConfig

def __init__(self, config):
super().__init__(config)
self.cache = SparkRelationsCache()

@classmethod
def date_function(cls) -> str:
return "current_timestamp()"
Expand Down Expand Up @@ -124,9 +174,16 @@ def add_schema_to_cache(self, schema) -> str:
def list_relations_without_caching(
self, schema_relation: SparkRelation
) -> List[SparkRelation]:
kwargs = {"schema_relation": schema_relation}
kwargs = {"relation": schema_relation}
try:
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
tables = self.execute_macro(
LIST_TABLES_MACRO_NAME,
kwargs=kwargs
)
views = self.execute_macro(
LIST_VIEWS_MACRO_NAME,
kwargs=kwargs
)
except dbt.exceptions.RuntimeException as e:
errmsg = getattr(e, "msg", "")
if f"Database '{schema_relation}' not found" in errmsg:
Expand All @@ -137,39 +194,40 @@ def list_relations_without_caching(
return []

relations = []
for row in results:
if len(row) != 4:
raise dbt.exceptions.RuntimeException(
f'Invalid value from "show table extended ...", '
f"got {len(row)} values, expected 4"
)
_schema, name, _, information = row
rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table
is_delta = "Provider: delta" in information
is_hudi = "Provider: hudi" in information
view_names = views.columns["viewName"].values()

for tbl in tables:
rel_type = RelationType("view" if tbl["tableName"] in view_names else "table")
_schema = tbl["namespace"] if "namespace" in tables.column_names else tbl["database"]
relation = self.Relation.create(
schema=_schema,
identifier=name,
identifier=tbl["tableName"],
type=rel_type,
information=information,
is_delta=is_delta,
is_hudi=is_hudi,
TalkWIthKeyboard marked this conversation as resolved.
Show resolved Hide resolved
)
relations.append(relation)

return relations

def get_relation(
self, database: Optional[str], schema: str, identifier: str
self,
database: Optional[str],
schema: str,
identifier: str,
needs_information=False
) -> Optional[BaseRelation]:
if not self.Relation.include_policy.database:
database = None

return super().get_relation(database, schema, identifier)
cached = super().get_relation(database, schema, identifier)

if not needs_information:
return cached

return self._set_relation_information(cached) if cached else None

def parse_describe_extended(
self, relation: Relation, raw_rows: List[agate.Row]
) -> List[SparkColumn]:
) -> Tuple[Dict[str, any], List[SparkColumn]]:
# Convert the Row to a dict
dict_rows = [dict(zip(row._keys, row._values)) for row in raw_rows]
# Find the separator between the rows and the metadata provided
Expand All @@ -182,20 +240,17 @@ def parse_describe_extended(

raw_table_stats = metadata.get(KEY_TABLE_STATISTICS)
table_stats = SparkColumn.convert_table_stats(raw_table_stats)
return [
SparkColumn(
table_database=None,
table_schema=relation.schema,
table_name=relation.name,
table_type=relation.type,
table_owner=str(metadata.get(KEY_TABLE_OWNER)),
table_stats=table_stats,
column=column["col_name"],
column_index=idx,
dtype=column["data_type"],
)
for idx, column in enumerate(rows)
]
return metadata, [SparkColumn(
table_database=None,
table_schema=relation.schema,
table_name=relation.name,
table_type=relation.type,
table_owner=str(metadata.get(KEY_TABLE_OWNER)),
table_stats=table_stats,
column=column['col_name'],
column_index=idx,
dtype=column['data_type'],
) for idx, column in enumerate(rows)]

@staticmethod
def find_table_information_separator(rows: List[dict]) -> int:
Expand All @@ -216,60 +271,72 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]:
),
None,
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the issues we've seen around column caching (#431), I think we need to fully disable pulling from the cache in get_columns_in_relation. That is, until we're ready to roll out column-level caching as a real feature. That would require us to add cache invalidation logic to macros such as alter_column_type and alter_relation_add_remove_columns.

if not cached_relation:
updated_relation = self._get_updated_relation(relation)
if updated_relation:
self.cache.add(updated_relation)
else:
updated_relation = self._set_relation_information(cached_relation)

return updated_relation.columns if updated_relation is not None else []

def _get_updated_relation(self, relation: BaseRelation) -> Optional[SparkRelation]:
metadata = None
columns = []
if cached_relation and cached_relation.information:
columns = self.parse_columns_from_information(cached_relation)
if not columns:
# in open source delta 'show table extended' query output doesnt
# return relation's schema. if columns are empty from cache,
# use get_columns_in_relation spark macro
# which would execute 'describe extended tablename' query
try:
rows: List[agate.Row] = self.execute_macro(

try:
rows: List[agate.Row] = self.execute_macro(
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation}
)
columns = self.parse_describe_extended(relation, rows)
except dbt.exceptions.RuntimeException as e:
# spark would throw error when table doesn't exist, where other
# CDW would just return and empty list, normalizing the behavior here
errmsg = getattr(e, "msg", "")
if "Table or view not found" in errmsg or "NoSuchTableException" in errmsg:
pass
else:
raise e
metadata, columns = self.parse_describe_extended(relation, rows)
except dbt.exceptions.RuntimeException as e:
# spark would throw error when table doesn't exist, where other
# CDW would just return and empty list, normalizing the behavior here
errmsg = getattr(e, "msg", "")
if (
"Table or view not found" in errmsg or
"NoSuchTableException" in errmsg
):
pass
else:
raise e

# strip hudi metadata columns.
columns = [x for x in columns if x.name not in self.HUDI_METADATA_COLUMNS]
return columns

def parse_columns_from_information(self, relation: SparkRelation) -> List[SparkColumn]:
owner_match = re.findall(self.INFORMATION_OWNER_REGEX, relation.information)
owner = owner_match[0] if owner_match else None
matches = re.finditer(self.INFORMATION_COLUMNS_REGEX, relation.information)
columns = []
stats_match = re.findall(self.INFORMATION_STATISTICS_REGEX, relation.information)
raw_table_stats = stats_match[0] if stats_match else None
table_stats = SparkColumn.convert_table_stats(raw_table_stats)
for match_num, match in enumerate(matches):
column_name, column_type, nullable = match.groups()
column = SparkColumn(
table_database=None,
table_schema=relation.schema,
table_name=relation.table,
table_type=relation.type,
column_index=match_num,
table_owner=owner,
column=column_name,
dtype=column_type,
table_stats=table_stats,
)
columns.append(column)
return columns
if not metadata:
return None

provider = metadata.get(KEY_TABLE_PROVIDER)
return self.Relation.create(
database=None,
schema=relation.schema,
identifier=relation.identifier,
type=relation.type,
is_delta=(provider == 'delta'),
is_hudi=(provider == 'hudi'),
owner=metadata.get(KEY_TABLE_OWNER),
stats=metadata.get(KEY_TABLE_STATISTICS),
columns=columns
)

def _set_relation_information(self, relation: SparkRelation) -> SparkRelation:
"""Update the information of the relation, or return it if it already exists."""
if relation.has_information():
return relation
TalkWIthKeyboard marked this conversation as resolved.
Show resolved Hide resolved

updated_relation = self._get_updated_relation(relation)

self.cache.update_relation(updated_relation)
return updated_relation

def _get_columns_for_catalog(self, relation: SparkRelation) -> Iterable[Dict[str, Any]]:
columns = self.parse_columns_from_information(relation)
def _get_columns_for_catalog(
self, relation: SparkRelation
) -> Iterable[Dict[str, Any]]:
updated_relation = self._set_relation_information(relation)

for column in columns:
for column in (updated_relation.columns if updated_relation is not None else []):
# convert SparkColumns into catalog dicts
as_dict = column.to_column_dict()
as_dict["column_name"] = as_dict.pop("column", None)
Expand Down
15 changes: 12 additions & 3 deletions dbt/adapters/spark/relation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from typing import Optional
from typing import Optional, List, Dict, Hashable

from dataclasses import dataclass
from dataclasses import dataclass, field

from dbt.adapters.base.relation import BaseRelation, Policy
from dbt.exceptions import RuntimeException

from dbt.adapters.spark.column import SparkColumn


@dataclass
class SparkQuotePolicy(Policy):
Expand All @@ -27,7 +29,9 @@ class SparkRelation(BaseRelation):
quote_character: str = "`"
is_delta: Optional[bool] = None
is_hudi: Optional[bool] = None
information: Optional[str] = None
owner: Optional[str] = None
stats: Optional[str] = None
columns: List[SparkColumn] = field(default_factory=lambda: [])

def __post_init__(self):
if self.database != self.schema and self.database:
Expand All @@ -40,3 +44,8 @@ def render(self):
"include, but only one can be set"
)
return super().render()

def has_information(self) -> bool:
return self.owner is not None and \
self.stats is not None and \
len(self.columns) > 0
Loading