Skip to content

Commit

Permalink
Exposure dependency resolution by fully-qualified names (#287)
Browse files Browse the repository at this point in the history
* Exposure dependency resolution by fully-qualified names

* Reference databases by dbname

* remove redundant calls to extract_card_exposures with empty cards

* extract dbname from details

* update dependencies

* support project-id as exposure database
  • Loading branch information
gouline authored Dec 16, 2024
1 parent c2ba2fb commit 781af8c
Show file tree
Hide file tree
Showing 5 changed files with 1,327 additions and 807 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
export SETUPTOOLS_SCM_PRETEND_VERSION ?= 0.0.0

.PHONY: dependencies
dependencies:
SETUPTOOLS_SCM_PRETEND_VERSION=0.0.0 \
uv sync --no-install-project --all-extras --frozen

.PHONY: upgrade
upgrade:
SETUPTOOLS_SCM_PRETEND_VERSION=0.0.0 \
uv sync --no-install-project --all-extras --upgrade

.PHONY: build
Expand Down
190 changes: 103 additions & 87 deletions dbtmetabase/_exposures.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from .errors import ArgumentError
from .format import Filter, dump_yaml, safe_description, safe_name
from .manifest import Manifest
from .manifest import DEFAULT_SCHEMA, Manifest

_RESOURCE_VERSION = 2

Expand Down Expand Up @@ -78,9 +78,28 @@ def extract_exposures(

models = self.manifest.read_models()

def dbname(details: Mapping) -> str:
"""Parse database name from Metabase database details."""
for key in ("dbname", "db", "project-id", "catalog"):
if key in details:
return details[key]
return ""

ctx = self.__Context(
model_refs={m.alias.lower(): m.ref for m in models if m.ref},
table_names={t["id"]: t["name"] for t in self.metabase.get_tables()},
model_refs={m.alias_path.lower(): m.ref for m in models if m.ref},
database_names={
d["id"]: dbname(d["details"]) for d in self.metabase.get_databases()
},
table_names={
t["id"]: ".".join(
[
dbname(t["db"]["details"]),
t["schema"] or DEFAULT_SCHEMA,
t["name"],
]
).lower()
for t in self.metabase.get_tables()
},
)

exposures = []
Expand Down Expand Up @@ -127,7 +146,7 @@ def extract_exposures(
f"Visualization: {entity.get('display', 'Unknown').title()}"
)

result = self.__extract_card_exposures(ctx, card=entity)
result = self.__extract_card_exposures(ctx, entity)
depends.update(result["depends"])
native_query = result["native_query"]

Expand Down Expand Up @@ -155,12 +174,9 @@ def extract_exposures(
if "id" not in card:
continue

depends.update(
self.__extract_card_exposures(
ctx,
card=self.metabase.find_card(uid=card["id"]),
)["depends"]
)
if card := self.metabase.find_card(uid=card["id"]):
result = self.__extract_card_exposures(ctx, card)
depends.update(result["depends"])
else:
_logger.warning("Unexpected collection item '%s'", item["model"])
continue
Expand Down Expand Up @@ -218,92 +234,91 @@ def extract_exposures(

return exposures

def __extract_card_exposures(
self,
ctx: __Context,
card: Optional[Mapping],
) -> Mapping:
def __extract_card_exposures(self, ctx: __Context, card: Mapping) -> Mapping:
"""Extracts exposures from Metabase questions."""

depends = set()
native_query = ""

if card:
query = card.get("dataset_query", {})
if query.get("type") == "query":
# Metabase GUI derived query
query_source = query.get("query", {}).get(
"source-table", card.get("table_id")
)

if str(query_source).startswith("card__"):
query = card.get("dataset_query", {})
if query.get("type") == "query":
# Metabase GUI derived query
query_source = query.get("query", {}).get(
"source-table", card.get("table_id")
)

if str(query_source).startswith("card__"):
# Handle questions based on other questions
if source_card := self.metabase.find_card(
uid=query_source.split("__")[-1]
):
result = self.__extract_card_exposures(ctx, source_card)
depends.update(result["depends"])
elif query_source in ctx.table_names:
# Normal question
source_table = ctx.table_names.get(query_source)
if source_table:
source_table = source_table.lower()
_logger.info("Extracted model '%s' from card", source_table)
depends.add(source_table)

# Find models exposed through joins
for join in query.get("query", {}).get("joins", []):
join_source = join.get("source-table")

if str(join_source).startswith("card__"):
# Handle questions based on other questions
depends.update(
self.__extract_card_exposures(
ctx,
card=self.metabase.find_card(
uid=query_source.split("__")[-1]
),
)["depends"]
)
elif query_source in ctx.table_names:
# Normal question
source_table = ctx.table_names.get(query_source)
if source_table:
source_table = source_table.lower()
_logger.info("Extracted model '%s' from card", source_table)
depends.add(source_table)

# Find models exposed through joins
for join in query.get("query", {}).get("joins", []):
join_source = join.get("source-table")

if str(join_source).startswith("card__"):
# Handle questions based on other questions
depends.update(
self.__extract_card_exposures(
ctx,
card=self.metabase.find_card(
uid=join_source.split("__")[-1]
),
)["depends"]
)
continue
if source_card := self.metabase.find_card(
uid=join_source.split("__")[-1]
):
result = self.__extract_card_exposures(ctx, source_card)
depends.update(result["depends"])

# Joined model parsed
joined_table = ctx.table_names.get(join_source)
if joined_table:
joined_table = joined_table.lower()
_logger.info("Extracted model '%s' from join", joined_table)
depends.add(joined_table)

elif query.get("type") == "native":
# Metabase native query
native_query = query["native"].get("query")
ctes: MutableSequence[str] = []

# Parse common table expressions for exclusion
for matched_cte in re.findall(_CTE_PARSER, native_query):
ctes.extend(group.lower() for group in matched_cte if group)

# Parse SQL for exposures through FROM or JOIN clauses
for sql_ref in re.findall(_EXPOSURE_PARSER, native_query):
# Grab just the table / model name
parsed_model = sql_ref.split(".")[-1].strip('"').lower()

# Scrub CTEs (qualified sql_refs can not reference CTEs)
if parsed_model in ctes and "." not in sql_ref:
continue
continue

# Verify this is one of our parsed refable models so exposures dont break the DAG
if not ctx.model_refs.get(parsed_model):
continue
# Joined model parsed
joined_table = ctx.table_names.get(join_source)
if joined_table:
joined_table = joined_table.lower()
_logger.info("Extracted model '%s' from join", joined_table)
depends.add(joined_table)

elif query.get("type") == "native":
# Metabase native query
native_query = query["native"].get("query")
ctes: MutableSequence[str] = []

# Parse common table expressions for exclusion
for matched_cte in re.findall(_CTE_PARSER, native_query):
ctes.extend(group.lower() for group in matched_cte if group)

# Parse SQL for exposures through FROM or JOIN clauses
for sql_ref in re.findall(_EXPOSURE_PARSER, native_query):
# DATABASE.schema.table -> [database, schema, table]
parsed_model_path = [s.strip('"').lower() for s in sql_ref.split(".")]

# Scrub CTEs (qualified sql_refs can not reference CTEs)
if parsed_model_path[-1] in ctes and "." not in sql_ref:
continue

# Missing schema -> use default schema
if len(parsed_model_path) < 2:
parsed_model_path.insert(0, DEFAULT_SCHEMA.lower())
# Missing database -> use query's database
if len(parsed_model_path) < 3:
database_name = ctx.database_names.get(query["database"], "")
parsed_model_path.insert(0, database_name.lower())

# Fully-qualified database.schema.table
parsed_model = ".".join(parsed_model_path)

# Verify this is one of our parsed refable models so exposures dont break the DAG
if not ctx.model_refs.get(parsed_model):
continue

if parsed_model:
_logger.info(
"Extracted model '%s' from native query", parsed_model
)
depends.add(parsed_model)
if parsed_model:
_logger.info("Extracted model '%s' from native query", parsed_model)
depends.add(parsed_model)

return {
"depends": depends,
Expand Down Expand Up @@ -429,4 +444,5 @@ def __write_exposures(
@dc.dataclass
class __Context:
model_refs: Mapping[str, str]
database_names: Mapping[str, str]
table_names: Mapping[str, str]
4 changes: 4 additions & 0 deletions dbtmetabase/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ def ref(self) -> Optional[str]:
return f"source('{self.source}', '{self.name}')"
return None

@property
def alias_path(self) -> str:
return ".".join([self.database, self.schema or DEFAULT_SCHEMA, self.alias])

def format_description(
self,
append_tags: bool = False,
Expand Down
6 changes: 5 additions & 1 deletion dbtmetabase/metabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,13 @@ def _api(

return response_json

def get_databases(self) -> Sequence[Mapping]:
"""Retrieves all databases."""
return list(self._api("get", "/api/database"))

def find_database(self, name: str) -> Optional[Mapping]:
"""Finds database by name attribute or returns none."""
for api_database in list(self._api("get", "/api/database")):
for api_database in self.get_databases():
if api_database["name"].upper() == name.upper():
return api_database
return None
Expand Down
Loading

0 comments on commit 781af8c

Please sign in to comment.