From 6628f59e6aaa6182aa1c35198f34ae01bfd4f4e5 Mon Sep 17 00:00:00 2001 From: Mike Gouline <1960272+gouline@users.noreply.github.com> Date: Tue, 9 Jan 2024 14:37:12 +1100 Subject: [PATCH] Improvements to exposure extraction --- Makefile | 22 ++++ dbtmetabase/__main__.py | 28 +++-- dbtmetabase/_format.py | 28 +++++ dbtmetabase/metabase.py | 172 +++++++++++++++++++---------- tests/fixtures/exposure/.gitignore | 2 +- tests/test_metabase.py | 6 +- 6 files changed, 183 insertions(+), 75 deletions(-) create mode 100644 dbtmetabase/_format.py diff --git a/Makefile b/Makefile index 200e4904..1bbf4c4f 100644 --- a/Makefile +++ b/Makefile @@ -73,6 +73,28 @@ dev-sandbox-models: --metabase-database $$POSTGRES_DB ) .PHONY: dev-sandbox-models +dev-sandbox-exposures: + rm -rf tests/fixtures/sample_project/models/exposures + mkdir -p tests/fixtures/sample_project/models/exposures + ( source sandbox/.env && python3 -m dbtmetabase exposures \ + --dbt-manifest-path sandbox/target/manifest.json \ + --dbt-database $$POSTGRES_DB \ + --metabase-url http://localhost:$$MB_PORT \ + --metabase-username $$MB_USER \ + --metabase-password $$MB_PASSWORD \ + --output-path tests/fixtures/sample_project/models/exposures \ + --output-grouping collection ) + + ( source sandbox/.env && cd tests/fixtures/sample_project && \ + POSTGRES_HOST=localhost \ + POSTGRES_PORT=$$POSTGRES_PORT \ + POSTGRES_USER=$$POSTGRES_USER \ + POSTGRES_PASSWORD=$$POSTGRES_PASSWORD \ + POSTGRES_DB=$$POSTGRES_DB \ + POSTGRES_SCHEMA=$$POSTGRES_SCHEMA \ + dbt docs generate --profiles-dir ../../../sandbox ) +.PHONY: dev-sandbox-exposures + dev-sandbox-down: ( cd sandbox && docker-compose down ) .PHONY: dev-sandbox-up diff --git a/dbtmetabase/__main__.py b/dbtmetabase/__main__.py index 06040e4e..d6b526d5 100644 --- a/dbtmetabase/__main__.py +++ b/dbtmetabase/__main__.py @@ -340,17 +340,14 @@ def models( type=click.Path(exists=True, file_okay=False), default=".", show_default=True, - help="Output path for generated exposure YAML.", + help="Output path for generated exposure YAML files.", ) @click.option( - "--output-name", - metavar="NAME", - envvar="OUTPUT_NAME", + "--output-grouping", + envvar="OUTPUT_GROUPING", show_envvar=True, - type=click.STRING, - default="metabase_exposures.yml", - show_default=True, - help="File name for generated exposure YAML.", + type=click.Choice(["collection", "type"]), + help="Grouping for output YAML files", ) @click.option( "--metabase-include-personal-collections", @@ -359,6 +356,15 @@ def models( is_flag=True, help="Include personal collections when parsing exposures.", ) +@click.option( + "--metabase-collection-includes", + metavar="COLLECTIONS", + envvar="METABASE_COLLECTION_INCLUDES", + show_envvar=True, + type=click.UNPROCESSED, + callback=_comma_separated_list_callback, + help="Metabase collection names to includes.", +) @click.option( "--metabase-collection-excludes", metavar="COLLECTIONS", @@ -370,8 +376,9 @@ def models( ) def exposures( output_path: str, - output_name: str, + output_grouping: Optional[str], metabase_include_personal_collections: bool, + metabase_collection_includes: Optional[Iterable], metabase_collection_excludes: Optional[Iterable], dbt_reader: DbtReader, metabase_client: MetabaseClient, @@ -380,8 +387,9 @@ def exposures( metabase_client.extract_exposures( models=dbt_models, output_path=output_path, - output_name=output_name, + output_grouping=output_grouping, include_personal_collections=metabase_include_personal_collections, + collection_includes=metabase_collection_includes, collection_excludes=metabase_collection_excludes, ) diff --git a/dbtmetabase/_format.py b/dbtmetabase/_format.py new file mode 100644 index 00000000..22dea718 --- /dev/null +++ b/dbtmetabase/_format.py @@ -0,0 +1,28 @@ +import re +from typing import Optional + + +def safe_name(text: Optional[str]) -> str: + """Sanitizes a human-readable "friendly" name to a safe string. + + For example, "Joe's Collection" becomes "joe_s_collection". + + Args: + text (Optional[str]): Unsafe text with non-underscore symbols and spaces. + + Returns: + str: Sanitized lowercase string with underscores. + """ + return re.sub(r"[^\w]", "_", text or "").lower() + + +def safe_description(text: Optional[str]) -> str: + """Sanitizes a human-readable long text, such as description. + + Args: + text (Optional[str]): Unsafe long text with Jinja syntax. + + Returns: + str: Sanitized string with escaped Jinja syntax. + """ + return re.sub(r"{{(.*)}}", r"\1", text or "") diff --git a/dbtmetabase/metabase.py b/dbtmetabase/metabase.py index 8ed373d9..393ca16e 100644 --- a/dbtmetabase/metabase.py +++ b/dbtmetabase/metabase.py @@ -4,12 +4,23 @@ import re import time from pathlib import Path -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union +from typing import ( + Any, + Dict, + Iterable, + List, + Mapping, + MutableMapping, + Optional, + Tuple, + Union, +) import requests import yaml from requests.adapters import HTTPAdapter, Retry +from ._format import safe_description, safe_name from .dbt import ( METABASE_MODEL_DEFAULT_SCHEMA, MetabaseColumn, @@ -184,7 +195,7 @@ def _export_model(self, model: MetabaseModel) -> bool: api_display_name = api_table.get("display_name") if api_display_name != model_display_name and ( model_display_name - or not self._friendly_equal(api_display_name, api_table.get("name")) + or safe_name(api_display_name) != safe_name(api_table.get("name")) ): body_table["display_name"] = model_display_name @@ -321,7 +332,7 @@ def _export_column( api_display_name = api_field.get("display_name") if api_display_name != column_display_name and ( column_display_name - or not self._friendly_equal(api_display_name, api_field.get("name")) + or safe_name(api_display_name) != safe_name(api_field.get("name")) ): body_field["display_name"] = column_display_name @@ -399,24 +410,6 @@ def _load_tables(self, database_id: str) -> Mapping[str, MutableMapping]: return tables - def _friendly_equal(self, a: Optional[str], b: Optional[str]) -> bool: - """Equality test for parameters susceptible to Metabase "friendly names". - - For example, "Some Name" is a friendly name for "some_name". - - Args: - a (Optional[str]): Possibly-friendly string. - b (Optional[str]): Possibly-friendly string. - - Returns: - bool: True if strings are equal normalization. - """ - - def normalize(x: Optional[str]) -> str: - return (x or "").replace(" ", "_").replace("-", "_").lower() - - return normalize(a) == normalize(b) - class _ExtractExposuresJob(_MetabaseClientJob): _RESOURCE_VERSION = 2 @@ -437,15 +430,23 @@ def __init__( client: MetabaseClient, models: List[MetabaseModel], output_path: str, - output_name: str, + output_grouping: Optional[str], include_personal_collections: bool, + collection_includes: Optional[Iterable], collection_excludes: Optional[Iterable], ): super().__init__(client) self.model_refs = {model.name.upper(): model.ref for model in models} - self.output_file = Path(output_path).expanduser() / f"{output_name}.yml" + self.output_path = Path(output_path).expanduser() + + if output_grouping in (None, "collection", "type"): + self.output_grouping = output_grouping + else: + raise ValueError(f"Unsupported output_grouping: {output_grouping}") + self.include_personal_collections = include_personal_collections + self.collection_includes = collection_includes or [] self.collection_excludes = collection_excludes or [] self.table_names: Mapping = {} @@ -467,11 +468,17 @@ def execute(self) -> Mapping: parsed_exposures = [] for collection in self.client.api("get", "/api/collection"): - # Exclude collections by name or personal collections (unless included) - if collection["name"] in self.collection_excludes or ( - collection.get("personal_owner_id") - and not self.include_personal_collections - ): + # Inclusion/exclusion criteria check + name_included = ( + collection["name"] in self.collection_includes + or not self.collection_includes + ) + name_excluded = collection["name"] in self.collection_excludes + personal_included = self.include_personal_collections or not collection.get( + "personal_owner_id" + ) + if not name_included or name_excluded or not personal_included: + logging.debug("Skipping collection %s", collection["name"]) continue # Iter through collection @@ -554,7 +561,7 @@ def execute(self) -> Mapping: exposure_label = exposure_name # Only letters, numbers and underscores allowed in model names in dbt docs DAG / no duplicate model names - exposure_name = re.sub(r"[^\w]", "_", exposure_name).lower() + exposure_name = safe_name(exposure_name) enumer = 1 while exposure_name in documented_exposure_names: exposure_name = f"{exposure_name}_{enumer}" @@ -562,37 +569,48 @@ def execute(self) -> Mapping: # Construct exposure parsed_exposures.append( - self._build_exposure( - exposure_type=exposure_type, - exposure_id=exposure_id, - name=exposure_name, - label=exposure_label, - header=header or "", - created_at=exposure["created_at"], - creator_name=creator_name or "", - creator_email=creator_email or "", - description=exposure.get("description", ""), - native_query=native_query, - ) + { + "id": item["id"], + "type": item["model"], + "collection": collection, + "exposure": self._build_exposure( + exposure_type=exposure_type, + exposure_id=exposure_id, + name=exposure_name, + label=exposure_label, + header=header or "", + created_at=exposure["created_at"], + creator_name=creator_name or "", + creator_email=creator_email or "", + description=exposure.get("description", ""), + native_query=native_query, + ), + } ) documented_exposure_names.append(exposure_name) - # Output dbt YAML - result = { - "version": self._RESOURCE_VERSION, - "exposures": parsed_exposures, - } - with open(self.output_file, "w", encoding="utf-8") as docs: - yaml.dump( - result, - docs, - Dumper=self.DbtDumper, - default_flow_style=False, - allow_unicode=True, - sort_keys=False, - ) - return result + for group, exposures in self._group_exposures(parsed_exposures).items(): + path = self.output_path.joinpath(*group[:-1]) / f"{group[-1]}.yml" + path.parent.mkdir(parents=True, exist_ok=True) + + exposures_unwrapped = map(lambda x: x["exposure"], exposures) + exposures_sorted = sorted(exposures_unwrapped, key=lambda x: x["name"]) + + with open(path, "w", encoding="utf-8") as f: + yaml.dump( + { + "version": self._RESOURCE_VERSION, + "exposures": exposures_sorted, + }, + f, + Dumper=self.DbtDumper, + default_flow_style=False, + allow_unicode=True, + sort_keys=False, + ) + + return {"exposures": parsed_exposures} # todo: decide on output? def _extract_card_exposures( self, @@ -751,7 +769,7 @@ def _build_exposure( return { "name": name, "label": label, - "description": description, + "description": safe_description(description), "type": "analysis" if exposure_type == "card" else "dashboard", "url": f"{self.client.url}/{exposure_type}/{exposure_id}", "maturity": "medium", @@ -768,6 +786,35 @@ def _build_exposure( ), } + def _group_exposures( + self, exposures: Iterable[Mapping] + ) -> Mapping[Tuple[str, ...], Iterable[Mapping]]: + """Group exposures by configured output grouping. + + Args: + exposures (Iterable[Mapping]): Collection of exposures. + + Returns: + Mapping[Tuple[str, ...], Iterable[Mapping]]: Exposures indexed by configured grouping. + """ + + results: Dict[Tuple[str, ...], List[Mapping]] = {} + + for exposure in exposures: + group: Tuple[str, ...] = ("exposures",) + if self.output_grouping == "collection": + collection = exposure["collection"] + group = (collection.get("slug") or safe_name(collection["name"]),) + elif self.output_grouping == "type": + group = (exposure["type"], exposure["id"]) + + result = results.get(group, []) + result.append(exposure) + if group not in results: + results[group] = result + + return results + class MetabaseClient: """Metabase API client.""" @@ -894,17 +941,19 @@ def extract_exposures( self, models: List[MetabaseModel], output_path: str = ".", - output_name: str = "metabase_exposures", + output_grouping: Optional[str] = None, include_personal_collections: bool = True, + collection_includes: Optional[Iterable] = None, collection_excludes: Optional[Iterable] = None, ) -> Mapping: """Extracts exposures in Metabase downstream of dbt models and sources as parsed by dbt reader. Args: models (List[MetabaseModel]): List of dbt models. - output_path (str, optional): Path for output YAML. Defaults to ".". - output_name (str, optional): Name for output YAML. Defaults to "metabase_exposures". + output_path (str, optional): Path for output files. Defaults to ".". + output_grouping (Optional[str], optional): Grouping for output YAML files, supported values: "collection" (by collection slug) or "type" (by entity type). Defaults to None. include_personal_collections (bool, optional): Include personal Metabase collections. Defaults to True. + collection_includes (Optional[Iterable], optional): Include certain Metabase collections. Defaults to None. collection_excludes (Optional[Iterable], optional): Exclude certain Metabase collections. Defaults to None. Returns: @@ -914,7 +963,8 @@ def extract_exposures( client=self, models=models, output_path=output_path, - output_name=output_name, + output_grouping=output_grouping, include_personal_collections=include_personal_collections, + collection_includes=collection_includes, collection_excludes=collection_excludes, ).execute() diff --git a/tests/fixtures/exposure/.gitignore b/tests/fixtures/exposure/.gitignore index 0031d8e4..b6104ec0 100644 --- a/tests/fixtures/exposure/.gitignore +++ b/tests/fixtures/exposure/.gitignore @@ -1 +1 @@ -unittest_exposures.yml \ No newline at end of file +exposures.yml diff --git a/tests/test_metabase.py b/tests/test_metabase.py index 65bcbc41..43669b58 100644 --- a/tests/test_metabase.py +++ b/tests/test_metabase.py @@ -269,20 +269,20 @@ def setUp(self): def test_exposures(self): output_path = FIXTURES_PATH / "exposure" - output_name = "unittest_exposures" job = _ExtractExposuresJob( client=self.client, models=MODELS, output_path=str(output_path), - output_name=output_name, + output_grouping=None, include_personal_collections=False, + collection_includes=None, collection_excludes=None, ) job.execute() with open(output_path / "baseline_test_exposures.yml", encoding="utf-8") as f: expected = yaml.safe_load(f) - with open(output_path / f"{output_name}.yml", encoding="utf-8") as f: + with open(output_path / "exposures.yml", encoding="utf-8") as f: actual = yaml.safe_load(f) expected_exposures = sorted(expected["exposures"], key=itemgetter("name"))