Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dramatically faster caching #433

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20220819-215714.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Faster caching at run start. **Note:** Requires Spark v3
time: 2022-08-19T21:57:14.56716+02:00
custom:
Author: TalkWIthKeyboard
Issue: "228"
PR: "342"
4 changes: 2 additions & 2 deletions dbt/adapters/spark/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
from typing import Any, Dict, Optional, TypeVar, Union

from dbt.adapters.base.column import Column
from dbt.dataclass_schema import dbtClassMixin
from dbt.contracts.relation import FakeAPIObject
from hologram import JsonDict

Self = TypeVar("Self", bound="SparkColumn")


@dataclass
class SparkColumn(dbtClassMixin, Column):
class SparkColumn(FakeAPIObject, Column):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This enables SparkColumn to be validated as a field on SparkRelation

table_database: Optional[str] = None
table_schema: Optional[str] = None
table_name: Optional[str] = None
Expand Down
225 changes: 144 additions & 81 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
import base64
from concurrent.futures import Future
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Union
from typing import Any, Dict, Iterable, List, Optional, Union, Tuple
from typing_extensions import TypeAlias

import agate
from dbt.adapters.cache import RelationsCache, _CachedRelation
from dbt.contracts.relation import RelationType
from dbt.events.base_types import DebugLevel, Cache
from dbt.adapters.reference_keys import _ReferenceKey, _make_key
from dbt.events.functions import fire_event

import dbt
import dbt.exceptions
Expand All @@ -29,12 +33,14 @@

GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME = "get_columns_in_relation_raw"
LIST_SCHEMAS_MACRO_NAME = "list_schemas"
LIST_RELATIONS_MACRO_NAME = "list_relations_without_caching"
LIST_TABLES_MACRO_NAME = "list_tables_without_caching"
LIST_VIEWS_MACRO_NAME = "list_views_without_caching"
DROP_RELATION_MACRO_NAME = "drop_relation"
FETCH_TBL_PROPERTIES_MACRO_NAME = "fetch_tbl_properties"

KEY_TABLE_OWNER = "Owner"
KEY_TABLE_STATISTICS = "Statistics"
KEY_TABLE_PROVIDER = "Provider"


@dataclass
Expand All @@ -48,6 +54,46 @@ class SparkConfig(AdapterConfig):
merge_update_columns: Optional[str] = None


@dataclass
class UpdateRelation(DebugLevel, Cache):
relation: _ReferenceKey
code: str = "E038"

def message(self) -> str:
return f"Updating relation: {str(self.relation)}"


@dataclass
class UpdateMissingRelation(DebugLevel, Cache):
relation: _ReferenceKey
code: str = "E039"

def message(self) -> str:
return f"updated a nonexistent relationship: {str(self.relation)}"


class SparkRelationsCache(RelationsCache):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The events above, and cache methods below, could absolutely move into dbt-core. The big idea here is:

  • At the start of the run, we populate a sparser cache: which relations exist, what their names are, what types they are
  • For certain operations, we need to look up more detailed information (e.g. Is this a Delta table?). In that case, we look up the info and update the cached relation, so that the next lookup will be free.

def _update(self, relation: _CachedRelation):
key = relation.key()

if key not in self.relations:
fire_event(UpdateMissingRelation(relation=key))
return

self.relations[key].inner = relation.inner

def update_relation(self, relation):
"""Update the relation inner to the cache

: param BaseRelation relation: The underlying relation.
"""
cached = _CachedRelation(relation)
fire_event(UpdateRelation(relation=_make_key(cached)))

with self.lock:
self._update(cached)


class SparkAdapter(SQLAdapter):
COLUMN_NAMES = (
"table_database",
Expand Down Expand Up @@ -85,6 +131,10 @@ class SparkAdapter(SQLAdapter):
ConnectionManager: TypeAlias = SparkConnectionManager
AdapterSpecificConfigs: TypeAlias = SparkConfig

def __init__(self, config):
super().__init__(config)
self.cache = SparkRelationsCache()

@classmethod
def date_function(cls) -> str:
return "current_timestamp()"
Expand Down Expand Up @@ -114,23 +164,23 @@ def quote(self, identifier):
return "`{}`".format(identifier)

def add_schema_to_cache(self, schema) -> str:
"""Cache a new schema in dbt. It will show up in `list relations`."""
"""Cache a new schema in dbt. It will show up in list relations"""
if schema is None:
name = self.nice_connection_name()
dbt.exceptions.raise_compiler_error(
"Attempted to cache a null schema for {}".format(name)
)
if dbt.flags.USE_CACHE:
self.cache.add_schema(None, schema)
self.cache.add_schema(None, schema)
# so jinja doesn't render things
return ""

def list_relations_without_caching(
self, schema_relation: SparkRelation
) -> List[SparkRelation]:
kwargs = {"schema_relation": schema_relation}
kwargs = {"relation": schema_relation}
try:
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
tables = self.execute_macro(LIST_TABLES_MACRO_NAME, kwargs=kwargs)
views = self.execute_macro(LIST_VIEWS_MACRO_NAME, kwargs=kwargs)
except dbt.exceptions.RuntimeException as e:
errmsg = getattr(e, "msg", "")
if f"Database '{schema_relation}' not found" in errmsg:
Expand All @@ -141,37 +191,36 @@ def list_relations_without_caching(
return []

relations = []
for row in results:
if len(row) != 4:
raise dbt.exceptions.RuntimeException(
f'Invalid value from "show table extended ...", '
f"got {len(row)} values, expected 4"
)
_schema, name, _, information = row
rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table
is_delta = "Provider: delta" in information
is_hudi = "Provider: hudi" in information
view_names = views.columns["viewName"].values()

for tbl in tables:
rel_type = RelationType("view" if tbl["tableName"] in view_names else "table")
_schema = tbl["namespace"] if "namespace" in tables.column_names else tbl["database"]
relation = self.Relation.create(
schema=_schema,
identifier=name,
identifier=tbl["tableName"],
type=rel_type,
information=information,
is_delta=is_delta,
is_hudi=is_hudi,
)
relations.append(relation)

return relations

def get_relation(self, database: str, schema: str, identifier: str) -> Optional[BaseRelation]:
def get_relation(
self, database: Optional[str], schema: str, identifier: str, needs_information=False
) -> Optional[BaseRelation]:
Comment on lines +208 to +210
Copy link
Contributor Author

@jtcohen6 jtcohen6 Aug 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds a fourth argument to the get_relation signature, which only has 3 in dbt-core's "base" adapter implementation.

The idea: If we need "sparse" information, a sparse cache lookup will suffice. If we need "bonus" information (e.g. file format), then we need to first check to see if that additional information is available in the cache from a previous lookup. If not, we'll run a query to look it up, and update the cache accordingly.

if not self.Relation.include_policy.database:
database = None # type: ignore

return super().get_relation(database, schema, identifier)
cached = super().get_relation(database, schema, identifier)

if not needs_information:
return cached

return self._set_relation_information(cached) if cached else None

def parse_describe_extended(
self, relation: Relation, raw_rows: List[agate.Row]
) -> List[SparkColumn]:
) -> Tuple[Dict[str, Any], List[SparkColumn]]:
# Convert the Row to a dict
dict_rows = [dict(zip(row._keys, row._values)) for row in raw_rows]
# Find the separator between the rows and the metadata provided
Expand All @@ -184,7 +233,7 @@ def parse_describe_extended(

raw_table_stats = metadata.get(KEY_TABLE_STATISTICS)
table_stats = SparkColumn.convert_table_stats(raw_table_stats)
return [
return metadata, [
SparkColumn(
table_database=None,
table_schema=relation.schema,
Expand All @@ -209,69 +258,83 @@ def find_table_information_separator(rows: List[dict]) -> int:
return pos

def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]:
cached_relations = self.cache.get_relations(relation.database, relation.schema)
cached_relation = next(
(
cached_relation
for cached_relation in cached_relations
if str(cached_relation) == str(relation)
),
None,
)
columns = []
if cached_relation and cached_relation.information:
columns = self.parse_columns_from_information(cached_relation)
if not columns:
# in open source delta 'show table extended' query output doesnt
# return relation's schema. if columns are empty from cache,
# use get_columns_in_relation spark macro
# which would execute 'describe extended tablename' query
try:
rows: List[agate.Row] = self.execute_macro(
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation}
)
columns = self.parse_describe_extended(relation, rows)
except dbt.exceptions.RuntimeException as e:
# spark would throw error when table doesn't exist, where other
# CDW would just return and empty list, normalizing the behavior here
errmsg = getattr(e, "msg", "")
if "Table or view not found" in errmsg or "NoSuchTableException" in errmsg:
pass
else:
raise e
# We shouldn't access columns from the cache, until we've implemented
# proper cache update or invalidation at the column level
# https://github.com/dbt-labs/dbt-spark/issues/431
Comment on lines +261 to +263
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most relation attributes are unchanging, unless the relation is dropped and recreated. (dbt-core already has a mechanism to record that drop in its cache.)

However, on a few occasions, dbt does alter the columns within a relation: namely, to handle schema evolution in snapshots and incremental models (on_schema_change). We don't have a core mechanism to invalidate or update the cache when this happens. So, even though we have been and are still recording columns in the cache, we need to make get_columns_in_relation skip the cache and run a query every time.


# cached_relations = self.cache.get_relations(relation.database, relation.schema)
# cached_relation = next(
# (
# cached_relation
# for cached_relation in cached_relations
# if str(cached_relation) == str(relation)
# ),
# None,
# )

# For now, just always invalidate the cache
updated_relation, columns = self._get_updated_relation(relation)
if updated_relation:
self.cache.add(updated_relation)
return columns

def _get_updated_relation(
self, relation: BaseRelation
) -> Tuple[Optional[SparkRelation], List[SparkColumn]]:
metadata = None
columns: List[SparkColumn] = []

try:
rows: List[agate.Row] = self.execute_macro(
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation}
)
Comment on lines +288 to +290
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the describe table extended query that Spark uses to return detailed information, including the columns in the table

metadata, columns = self.parse_describe_extended(relation, rows)
except dbt.exceptions.RuntimeException as e:
# spark would throw error when table doesn't exist, where other
# CDW would just return and empty list, normalizing the behavior here
errmsg = getattr(e, "msg", "")
if "Table or view not found" in errmsg or "NoSuchTableException" in errmsg:
pass
else:
raise e

# strip hudi metadata columns.
columns = [x for x in columns if x.name not in self.HUDI_METADATA_COLUMNS]
return columns

def parse_columns_from_information(self, relation: SparkRelation) -> List[SparkColumn]:
owner_match = re.findall(self.INFORMATION_OWNER_REGEX, relation.information)
owner = owner_match[0] if owner_match else None
matches = re.finditer(self.INFORMATION_COLUMNS_REGEX, relation.information)
columns = []
stats_match = re.findall(self.INFORMATION_STATISTICS_REGEX, relation.information)
raw_table_stats = stats_match[0] if stats_match else None
table_stats = SparkColumn.convert_table_stats(raw_table_stats)
for match_num, match in enumerate(matches):
column_name, column_type, nullable = match.groups()
column = SparkColumn(
table_database=None,
table_schema=relation.schema,
table_name=relation.table,
table_type=relation.type,
column_index=match_num,
table_owner=owner,
column=column_name,
dtype=column_type,
table_stats=table_stats,
)
columns.append(column)
return columns
if not metadata:
# this is a temporary view, not worth caching -- just return columns
return None, columns

provider = metadata.get(KEY_TABLE_PROVIDER)
return (
self.Relation.create(
database=None,
schema=relation.schema,
identifier=relation.identifier,
type=relation.type,
is_delta=(provider == "delta"),
is_hudi=(provider == "hudi"),
owner=metadata.get(KEY_TABLE_OWNER),
stats=metadata.get(KEY_TABLE_STATISTICS),
columns=columns,
),
columns,
)

def _set_relation_information(self, relation: SparkRelation) -> SparkRelation:
"""Update the information of the relation, or return it if it already exists."""
if relation.has_information():
return relation

updated_relation, _ = self._get_updated_relation(relation)

self.cache.update_relation(updated_relation)
return updated_relation

def _get_columns_for_catalog(self, relation: SparkRelation) -> Iterable[Dict[str, Any]]:
columns = self.parse_columns_from_information(relation)
updated_relation = self._set_relation_information(relation)

for column in columns:
for column in updated_relation.columns:
# convert SparkColumns into catalog dicts
as_dict = column.to_column_dict()
as_dict["column_name"] = as_dict.pop("column", None)
Expand Down
13 changes: 10 additions & 3 deletions dbt/adapters/spark/relation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from typing import Optional
from typing import Optional, List

from dataclasses import dataclass
from dataclasses import dataclass, field

from dbt.adapters.base.relation import BaseRelation, Policy
from dbt.exceptions import RuntimeException

from dbt.adapters.spark.column import SparkColumn


@dataclass
class SparkQuotePolicy(Policy):
Expand All @@ -27,7 +29,9 @@ class SparkRelation(BaseRelation):
quote_character: str = "`"
is_delta: Optional[bool] = None
is_hudi: Optional[bool] = None
information: Optional[str] = None
owner: Optional[str] = None
stats: Optional[str] = None
columns: List[SparkColumn] = field(default_factory=lambda: [])

def __post_init__(self):
if self.database != self.schema and self.database:
Expand All @@ -40,3 +44,6 @@ def render(self):
"include, but only one can be set"
)
return super().render()

def has_information(self) -> bool:
return self.owner is not None and self.stats is not None and len(self.columns) > 0
Loading