-
Notifications
You must be signed in to change notification settings - Fork 236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dramatically faster caching #433
Changes from all commits
bf93ee5
2eafd5b
f9a86c3
e32bc8d
ce35692
3218960
1d1d715
8ca8cc7
0a1e864
88d917d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
kind: Features | ||
body: Faster caching at run start. **Note:** Requires Spark v3 | ||
time: 2022-08-19T21:57:14.56716+02:00 | ||
custom: | ||
Author: TalkWIthKeyboard | ||
Issue: "228" | ||
PR: "342" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,11 +4,15 @@ | |
import base64 | ||
from concurrent.futures import Future | ||
from dataclasses import dataclass | ||
from typing import Any, Dict, Iterable, List, Optional, Union | ||
from typing import Any, Dict, Iterable, List, Optional, Union, Tuple | ||
from typing_extensions import TypeAlias | ||
|
||
import agate | ||
from dbt.adapters.cache import RelationsCache, _CachedRelation | ||
from dbt.contracts.relation import RelationType | ||
from dbt.events.base_types import DebugLevel, Cache | ||
from dbt.adapters.reference_keys import _ReferenceKey, _make_key | ||
from dbt.events.functions import fire_event | ||
|
||
import dbt | ||
import dbt.exceptions | ||
|
@@ -29,12 +33,14 @@ | |
|
||
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME = "get_columns_in_relation_raw" | ||
LIST_SCHEMAS_MACRO_NAME = "list_schemas" | ||
LIST_RELATIONS_MACRO_NAME = "list_relations_without_caching" | ||
LIST_TABLES_MACRO_NAME = "list_tables_without_caching" | ||
LIST_VIEWS_MACRO_NAME = "list_views_without_caching" | ||
DROP_RELATION_MACRO_NAME = "drop_relation" | ||
FETCH_TBL_PROPERTIES_MACRO_NAME = "fetch_tbl_properties" | ||
|
||
KEY_TABLE_OWNER = "Owner" | ||
KEY_TABLE_STATISTICS = "Statistics" | ||
KEY_TABLE_PROVIDER = "Provider" | ||
|
||
|
||
@dataclass | ||
|
@@ -48,6 +54,46 @@ class SparkConfig(AdapterConfig): | |
merge_update_columns: Optional[str] = None | ||
|
||
|
||
@dataclass | ||
class UpdateRelation(DebugLevel, Cache): | ||
relation: _ReferenceKey | ||
code: str = "E038" | ||
|
||
def message(self) -> str: | ||
return f"Updating relation: {str(self.relation)}" | ||
|
||
|
||
@dataclass | ||
class UpdateMissingRelation(DebugLevel, Cache): | ||
relation: _ReferenceKey | ||
code: str = "E039" | ||
|
||
def message(self) -> str: | ||
return f"updated a nonexistent relationship: {str(self.relation)}" | ||
|
||
|
||
class SparkRelationsCache(RelationsCache): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The events above, and cache methods below, could absolutely move into
|
||
def _update(self, relation: _CachedRelation): | ||
key = relation.key() | ||
|
||
if key not in self.relations: | ||
fire_event(UpdateMissingRelation(relation=key)) | ||
return | ||
|
||
self.relations[key].inner = relation.inner | ||
|
||
def update_relation(self, relation): | ||
"""Update the relation inner to the cache | ||
|
||
: param BaseRelation relation: The underlying relation. | ||
""" | ||
cached = _CachedRelation(relation) | ||
fire_event(UpdateRelation(relation=_make_key(cached))) | ||
|
||
with self.lock: | ||
self._update(cached) | ||
|
||
|
||
class SparkAdapter(SQLAdapter): | ||
COLUMN_NAMES = ( | ||
"table_database", | ||
|
@@ -85,6 +131,10 @@ class SparkAdapter(SQLAdapter): | |
ConnectionManager: TypeAlias = SparkConnectionManager | ||
AdapterSpecificConfigs: TypeAlias = SparkConfig | ||
|
||
def __init__(self, config): | ||
super().__init__(config) | ||
self.cache = SparkRelationsCache() | ||
|
||
@classmethod | ||
def date_function(cls) -> str: | ||
return "current_timestamp()" | ||
|
@@ -114,23 +164,23 @@ def quote(self, identifier): | |
return "`{}`".format(identifier) | ||
|
||
def add_schema_to_cache(self, schema) -> str: | ||
"""Cache a new schema in dbt. It will show up in `list relations`.""" | ||
"""Cache a new schema in dbt. It will show up in list relations""" | ||
if schema is None: | ||
name = self.nice_connection_name() | ||
dbt.exceptions.raise_compiler_error( | ||
"Attempted to cache a null schema for {}".format(name) | ||
) | ||
if dbt.flags.USE_CACHE: | ||
self.cache.add_schema(None, schema) | ||
self.cache.add_schema(None, schema) | ||
# so jinja doesn't render things | ||
return "" | ||
|
||
def list_relations_without_caching( | ||
self, schema_relation: SparkRelation | ||
) -> List[SparkRelation]: | ||
kwargs = {"schema_relation": schema_relation} | ||
kwargs = {"relation": schema_relation} | ||
try: | ||
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs) | ||
tables = self.execute_macro(LIST_TABLES_MACRO_NAME, kwargs=kwargs) | ||
views = self.execute_macro(LIST_VIEWS_MACRO_NAME, kwargs=kwargs) | ||
except dbt.exceptions.RuntimeException as e: | ||
errmsg = getattr(e, "msg", "") | ||
if f"Database '{schema_relation}' not found" in errmsg: | ||
|
@@ -141,37 +191,36 @@ def list_relations_without_caching( | |
return [] | ||
|
||
relations = [] | ||
for row in results: | ||
if len(row) != 4: | ||
raise dbt.exceptions.RuntimeException( | ||
f'Invalid value from "show table extended ...", ' | ||
f"got {len(row)} values, expected 4" | ||
) | ||
_schema, name, _, information = row | ||
rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table | ||
is_delta = "Provider: delta" in information | ||
is_hudi = "Provider: hudi" in information | ||
view_names = views.columns["viewName"].values() | ||
|
||
for tbl in tables: | ||
rel_type = RelationType("view" if tbl["tableName"] in view_names else "table") | ||
_schema = tbl["namespace"] if "namespace" in tables.column_names else tbl["database"] | ||
relation = self.Relation.create( | ||
schema=_schema, | ||
identifier=name, | ||
identifier=tbl["tableName"], | ||
type=rel_type, | ||
information=information, | ||
is_delta=is_delta, | ||
is_hudi=is_hudi, | ||
) | ||
relations.append(relation) | ||
|
||
return relations | ||
|
||
def get_relation(self, database: str, schema: str, identifier: str) -> Optional[BaseRelation]: | ||
def get_relation( | ||
self, database: Optional[str], schema: str, identifier: str, needs_information=False | ||
) -> Optional[BaseRelation]: | ||
Comment on lines
+208
to
+210
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This adds a fourth argument to the The idea: If we need "sparse" information, a sparse cache lookup will suffice. If we need "bonus" information (e.g. file format), then we need to first check to see if that additional information is available in the cache from a previous lookup. If not, we'll run a query to look it up, and update the cache accordingly. |
||
if not self.Relation.include_policy.database: | ||
database = None # type: ignore | ||
|
||
return super().get_relation(database, schema, identifier) | ||
cached = super().get_relation(database, schema, identifier) | ||
|
||
if not needs_information: | ||
return cached | ||
|
||
return self._set_relation_information(cached) if cached else None | ||
|
||
def parse_describe_extended( | ||
self, relation: Relation, raw_rows: List[agate.Row] | ||
) -> List[SparkColumn]: | ||
) -> Tuple[Dict[str, Any], List[SparkColumn]]: | ||
# Convert the Row to a dict | ||
dict_rows = [dict(zip(row._keys, row._values)) for row in raw_rows] | ||
# Find the separator between the rows and the metadata provided | ||
|
@@ -184,7 +233,7 @@ def parse_describe_extended( | |
|
||
raw_table_stats = metadata.get(KEY_TABLE_STATISTICS) | ||
table_stats = SparkColumn.convert_table_stats(raw_table_stats) | ||
return [ | ||
return metadata, [ | ||
SparkColumn( | ||
table_database=None, | ||
table_schema=relation.schema, | ||
|
@@ -209,69 +258,83 @@ def find_table_information_separator(rows: List[dict]) -> int: | |
return pos | ||
|
||
def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: | ||
cached_relations = self.cache.get_relations(relation.database, relation.schema) | ||
cached_relation = next( | ||
( | ||
cached_relation | ||
for cached_relation in cached_relations | ||
if str(cached_relation) == str(relation) | ||
), | ||
None, | ||
) | ||
columns = [] | ||
if cached_relation and cached_relation.information: | ||
columns = self.parse_columns_from_information(cached_relation) | ||
if not columns: | ||
# in open source delta 'show table extended' query output doesnt | ||
# return relation's schema. if columns are empty from cache, | ||
# use get_columns_in_relation spark macro | ||
# which would execute 'describe extended tablename' query | ||
try: | ||
rows: List[agate.Row] = self.execute_macro( | ||
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation} | ||
) | ||
columns = self.parse_describe_extended(relation, rows) | ||
except dbt.exceptions.RuntimeException as e: | ||
# spark would throw error when table doesn't exist, where other | ||
# CDW would just return and empty list, normalizing the behavior here | ||
errmsg = getattr(e, "msg", "") | ||
if "Table or view not found" in errmsg or "NoSuchTableException" in errmsg: | ||
pass | ||
else: | ||
raise e | ||
# We shouldn't access columns from the cache, until we've implemented | ||
# proper cache update or invalidation at the column level | ||
# https://github.com/dbt-labs/dbt-spark/issues/431 | ||
Comment on lines
+261
to
+263
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most relation attributes are unchanging, unless the relation is dropped and recreated. (dbt-core already has a mechanism to record that drop in its cache.) However, on a few occasions, dbt does alter the columns within a relation: namely, to handle schema evolution in snapshots and incremental models ( |
||
|
||
# cached_relations = self.cache.get_relations(relation.database, relation.schema) | ||
# cached_relation = next( | ||
# ( | ||
# cached_relation | ||
# for cached_relation in cached_relations | ||
# if str(cached_relation) == str(relation) | ||
# ), | ||
# None, | ||
# ) | ||
|
||
# For now, just always invalidate the cache | ||
updated_relation, columns = self._get_updated_relation(relation) | ||
if updated_relation: | ||
self.cache.add(updated_relation) | ||
return columns | ||
|
||
def _get_updated_relation( | ||
self, relation: BaseRelation | ||
) -> Tuple[Optional[SparkRelation], List[SparkColumn]]: | ||
metadata = None | ||
columns: List[SparkColumn] = [] | ||
|
||
try: | ||
rows: List[agate.Row] = self.execute_macro( | ||
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation} | ||
) | ||
Comment on lines
+288
to
+290
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the |
||
metadata, columns = self.parse_describe_extended(relation, rows) | ||
except dbt.exceptions.RuntimeException as e: | ||
# spark would throw error when table doesn't exist, where other | ||
# CDW would just return and empty list, normalizing the behavior here | ||
errmsg = getattr(e, "msg", "") | ||
if "Table or view not found" in errmsg or "NoSuchTableException" in errmsg: | ||
pass | ||
else: | ||
raise e | ||
|
||
# strip hudi metadata columns. | ||
columns = [x for x in columns if x.name not in self.HUDI_METADATA_COLUMNS] | ||
return columns | ||
|
||
def parse_columns_from_information(self, relation: SparkRelation) -> List[SparkColumn]: | ||
owner_match = re.findall(self.INFORMATION_OWNER_REGEX, relation.information) | ||
owner = owner_match[0] if owner_match else None | ||
matches = re.finditer(self.INFORMATION_COLUMNS_REGEX, relation.information) | ||
columns = [] | ||
stats_match = re.findall(self.INFORMATION_STATISTICS_REGEX, relation.information) | ||
raw_table_stats = stats_match[0] if stats_match else None | ||
table_stats = SparkColumn.convert_table_stats(raw_table_stats) | ||
for match_num, match in enumerate(matches): | ||
column_name, column_type, nullable = match.groups() | ||
column = SparkColumn( | ||
table_database=None, | ||
table_schema=relation.schema, | ||
table_name=relation.table, | ||
table_type=relation.type, | ||
column_index=match_num, | ||
table_owner=owner, | ||
column=column_name, | ||
dtype=column_type, | ||
table_stats=table_stats, | ||
) | ||
columns.append(column) | ||
return columns | ||
if not metadata: | ||
# this is a temporary view, not worth caching -- just return columns | ||
return None, columns | ||
|
||
provider = metadata.get(KEY_TABLE_PROVIDER) | ||
return ( | ||
self.Relation.create( | ||
database=None, | ||
schema=relation.schema, | ||
identifier=relation.identifier, | ||
type=relation.type, | ||
is_delta=(provider == "delta"), | ||
is_hudi=(provider == "hudi"), | ||
owner=metadata.get(KEY_TABLE_OWNER), | ||
stats=metadata.get(KEY_TABLE_STATISTICS), | ||
columns=columns, | ||
), | ||
columns, | ||
) | ||
|
||
def _set_relation_information(self, relation: SparkRelation) -> SparkRelation: | ||
"""Update the information of the relation, or return it if it already exists.""" | ||
if relation.has_information(): | ||
return relation | ||
|
||
updated_relation, _ = self._get_updated_relation(relation) | ||
|
||
self.cache.update_relation(updated_relation) | ||
return updated_relation | ||
|
||
def _get_columns_for_catalog(self, relation: SparkRelation) -> Iterable[Dict[str, Any]]: | ||
columns = self.parse_columns_from_information(relation) | ||
updated_relation = self._set_relation_information(relation) | ||
|
||
for column in columns: | ||
for column in updated_relation.columns: | ||
# convert SparkColumns into catalog dicts | ||
as_dict = column.to_column_dict() | ||
as_dict["column_name"] = as_dict.pop("column", None) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This enables
SparkColumn
to be validated as a field onSparkRelation