Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Mar 28, 2024
2 parents 90460a2 + 9f2c5d3 commit 2820b98
Show file tree
Hide file tree
Showing 36 changed files with 2,725 additions and 1,123 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/airflow-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ jobs:
- name: Install dependencies
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Install airflow package and test (extras ${{ matrix.extra_pip_requirements }})
run: ./gradlew -Pextra_pip_requirements='${{ matrix.extra_pip_requirements }}' -Pextra_pip_extras='${{ matrix.extra_pip_extras }}' :metadata-ingestion-modules:airflow-plugin:lint :metadata-ingestion-modules:airflow-plugin:testQuick
run: ./gradlew -Pextra_pip_requirements='${{ matrix.extra_pip_requirements }}' -Pextra_pip_extras='${{ matrix.extra_pip_extras }}' :metadata-ingestion-modules:airflow-plugin:build
- name: pip freeze show list installed
if: always()
run: source metadata-ingestion-modules/airflow-plugin/venv/bin/activate && pip freeze
Expand Down
2 changes: 1 addition & 1 deletion docker/postgres-setup/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ CREATE TABLE IF NOT EXISTS metadata_aspect_v2 (
create index timeIndex ON metadata_aspect_v2 (createdon);

-- create default records for datahub user if not exists
CREATE TEMP TABLE temp_metadata_aspect_v2 AS TABLE metadata_aspect_v2;
CREATE TEMP TABLE temp_metadata_aspect_v2 AS TABLE metadata_aspect_v2 WITH NO DATA;
INSERT INTO temp_metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby) VALUES(
'urn:li:corpuser:datahub',
'corpUserInfo',
Expand Down
3 changes: 2 additions & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #10026 - The dbt `use_compiled_code` option has been removed, because we now support capturing both source and compiled dbt SQL. This can be configured using `include_compiled_code`, which will be default enabled in 0.13.1.
- #10055 - Assertion entities generated by dbt are now associated with the dbt dataset entity, and not the entity in the data warehouse.
- #10090 - For Redshift ingestion, `use_lineage_v2` is now enabled by default.

- #10147 - For looker ingestion, the browse paths for looker Dashboard, Chart, View, Explore have been updated to align with Looker UI. This does not affect URNs or lineage but primarily affects (improves) browsing experience.
-
### Potential Downtime

### Deprecations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,28 @@ class DatahubPlugin(AirflowPlugin):
name = "datahub_plugin"

if _USE_AIRFLOW_LISTENER_INTERFACE:
if not NEEDS_AIRFLOW_LISTENER_MODULE:
from datahub_airflow_plugin.datahub_listener import ( # type: ignore[misc]
get_airflow_plugin_listener,
try:
if not NEEDS_AIRFLOW_LISTENER_MODULE:
from datahub_airflow_plugin.datahub_listener import ( # type: ignore[misc]
get_airflow_plugin_listener,
)

listeners: list = list(filter(None, [get_airflow_plugin_listener()]))

else:
# On Airflow < 2.5, we need the listener to be a module.
# This is just a quick shim layer to make that work.
#
# Related Airflow change: https://github.com/apache/airflow/pull/27113.
import datahub_airflow_plugin._datahub_listener_module as _listener_module # type: ignore[misc]

listeners = [_listener_module]
except Exception as e:
logger.warning(
f"Failed to load the DataHub plugin's event listener: {e}",
exc_info=True,
)

listeners: list = list(filter(None, [get_airflow_plugin_listener()]))

else:
# On Airflow < 2.5, we need the listener to be a module.
# This is just a quick shim layer to make that work.
#
# Related Airflow change: https://github.com/apache/airflow/pull/27113.
import datahub_airflow_plugin._datahub_listener_module as _listener_module # type: ignore[misc]

listeners = [_listener_module]
listeners = []


if not _USE_AIRFLOW_LISTENER_INTERFACE:
Expand Down
60 changes: 53 additions & 7 deletions metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,70 @@ The below table shows transformer which can transform aspects of entity [Dataset
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|----------|---------|---------------|---------------------------------------------|
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `tag_prefix` | | str | | Regex to use for tags to match against. Supports Regex to match a prefix which is used to remove content. Rest of string is considered owner ID for creating owner URN. |
| `is_user` | | bool | `true` | Whether should be consider a user or not. If `false` then considered a group. |
| `tag_pattern` | | str | | Regex to use for tags to match against. Supports Regex to match a pattern which is used to remove content. Rest of string is considered owner ID for creating owner URN. |
| `is_user` | | bool | `true` | Whether should be consider a user or not. If `false` then considered a group. |
| `owner_character_mapping` | | dict[str, str] | | A mapping of extracted owner character to datahub owner character. |
| `email_domain` | | str | | If set then this is appended to create owner URN. |
| `extract_owner_type_from_tag_pattern` | | str | `false` | Whether to extract an owner type from provided tag pattern first group. If `true`, no need to provide owner_type and owner_type_urn config. For example: if provided tag pattern is `(.*)_owner_email:` and actual tag is `developer_owner_email`, then extracted owner type will be `developer`.|
| `owner_type` | | str | `TECHNICAL_OWNER` | Ownership type. |
| `owner_type_urn` | | str | `None` | Set to a custom ownership type's URN if using custom ownership. |

Matches against a tag prefix and considers string in tags after that prefix as owner to create ownership.
Let’s suppose we’d like to add a dataset ownerships based on part of dataset tags. To do so, we can use the `extract_ownership_from_tags` transformer that’s included in the ingestion framework.

The config, which we’d append to our ingestion recipe YAML, would look like this:

```yaml
transformers:
- type: "extract_ownership_from_tags"
config:
tag_prefix: "dbt:techno-genie:"
is_user: true
email_domain: "coolcompany.com"
tag_pattern: "owner_email:"
```
So if we have input dataset tag like
- `urn:li:tag:dataset_owner_email:[email protected]`
- `urn:li:tag:dataset_owner_email:[email protected]`

The portion of the tag after the matched tag pattern will be converted into an owner. Hence users `[email protected]` and `[email protected]` will be added as owners.

### Examples

- Add owners, however owner should be considered as group and also email domain not provided in tag string. For example: from tag urn `urn:li:tag:dataset_owner:abc` extracted owner urn should be `urn:li:corpGroup:[email protected]` then config would look like this:
```yaml
transformers:
- type: "extract_ownership_from_tags"
config:
tag_pattern: "owner:"
is_user: false
email_domain: "email.com"
```
- Add owners, however owner type and owner type urn wanted to provide externally. For example: from tag urn `urn:li:tag:dataset_owner_email:[email protected]` owner type should be `CUSTOM` and owner type urn as `"urn:li:ownershipType:data_product"` then config would look like this:
```yaml
transformers:
- type: "extract_ownership_from_tags"
config:
tag_pattern: "owner_email:"
owner_type: "CUSTOM"
owner_type_urn: "urn:li:ownershipType:data_product"
```
- Add owners, however some owner characters needs to replace with some other characters before ingestion. For example: from tag urn `urn:li:tag:dataset_owner_email:abc_xyz-email_com` extracted owner urn should be `urn:li:corpGroup:[email protected]` then config would look like this:
```yaml
transformers:
- type: "extract_ownership_from_tags"
config:
tag_pattern: "owner_email:"
owner_character_mapping:
"_": ".",
"-": "@",
```
- Add owners, however owner type also need to extracted from tag pattern. For example: from tag urn `urn:li:tag:data_producer_owner_email:[email protected]` extracted owner type should be `data_producer` then config would look like this:
```yaml
transformers:
- type: "extract_ownership_from_tags"
config:
tag_pattern: "(.*)_owner_email:"
extract_owner_type_from_tag_pattern: true
```

## Clean suffix prefix from Ownership
### Config Details
| Field | Required | Type | Default | Description |
Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ def make_owner_urn(owner: str, owner_type: OwnerType) -> str:
return f"urn:li:{owner_type.value}:{owner}"


def make_ownership_type_urn(type: str) -> str:
return f"urn:li:ownershipType:{type}"


def make_term_urn(term: str) -> str:
"""
Makes a term urn if the input is not a term urn already
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,7 @@ class LookerDashboardElement:
type: Optional[str] = None
description: Optional[str] = None
input_fields: Optional[List[InputFieldElement]] = None
folder_path: Optional[str] = None # for independent looks.

def url(self, base_url: str) -> str:
# A dashboard element can use a look or just a raw query against an explore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ class LookerCommonConfig(EnvConfigMixin, PlatformInstanceConfigMixin):
)
explore_browse_pattern: LookerNamingPattern = pydantic.Field(
description=f"Pattern for providing browse paths to explores. {LookerNamingPattern.allowed_docstring()}",
default=LookerNamingPattern(pattern="/{env}/{platform}/{project}/explores"),
default=LookerNamingPattern(pattern="/Explore/{project}/{model}"),
)
view_naming_pattern: LookerViewNamingPattern = Field(
LookerViewNamingPattern(pattern="{project}.view.{name}"),
description=f"Pattern for providing dataset names to views. {LookerViewNamingPattern.allowed_docstring()}",
)
view_browse_pattern: LookerViewNamingPattern = Field(
LookerViewNamingPattern(pattern="/{env}/{platform}/{project}/views"),
LookerViewNamingPattern(pattern="/Develop/{project}/{file_path}"),
description=f"Pattern for providing browse paths to views. {LookerViewNamingPattern.allowed_docstring()}",
)
tag_measures_and_dimensions: bool = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,12 @@ def _get_looker_dashboard_element( # noqa: C901
else:
slug = ""

look_folder_path = None
if element.look.folder is not None:
look_folder_path = self._get_folder_path(
element.look.folder, self.looker_api
)

return LookerDashboardElement(
id=element.id,
title=title,
Expand All @@ -464,6 +470,7 @@ def _get_looker_dashboard_element( # noqa: C901
for exp in explores
],
input_fields=input_fields,
folder_path=look_folder_path,
)

# Failing the above two approaches, pick out details from result_maker
Expand Down Expand Up @@ -524,10 +531,12 @@ def _get_looker_dashboard_element( # noqa: C901
type=element.type,
description=element.subtitle_text,
look_id=element.look_id,
query_slug=element.result_maker.query.slug
if element.result_maker.query is not None
and element.result_maker.query.slug is not None
else "",
query_slug=(
element.result_maker.query.slug
if element.result_maker.query is not None
and element.result_maker.query.slug is not None
else ""
),
upstream_explores=[
LookerExplore(model_name=model, name=exp) for exp in explores
],
Expand Down Expand Up @@ -603,18 +612,29 @@ def _make_chart_metadata_events(
chartUrl=dashboard_element.url(self.source_config.external_base_url or ""),
inputs=dashboard_element.get_view_urns(self.source_config),
customProperties={
"upstream_fields": ",".join(
sorted(set(field.name for field in dashboard_element.input_fields))
"upstream_fields": (
",".join(
sorted(
set(field.name for field in dashboard_element.input_fields)
)
)
if dashboard_element.input_fields
else ""
)
if dashboard_element.input_fields
else ""
},
)
chart_snapshot.aspects.append(chart_info)

if dashboard and dashboard.folder_path is not None:
browse_path = BrowsePathsClass(
paths=[f"/looker/{dashboard.folder_path}/{dashboard.title}"]
paths=[f"/Folders/{dashboard.folder_path}/{dashboard.title}"]
)
chart_snapshot.aspects.append(browse_path)
elif (
dashboard is None and dashboard_element.folder_path is not None
): # independent look
browse_path = BrowsePathsClass(
paths=[f"/Folders/{dashboard_element.folder_path}"]
)
chart_snapshot.aspects.append(browse_path)

Expand Down Expand Up @@ -673,7 +693,7 @@ def _make_dashboard_metadata_events(
dashboard_snapshot.aspects.append(dashboard_info)
if looker_dashboard.folder_path is not None:
browse_path = BrowsePathsClass(
paths=[f"/looker/{looker_dashboard.folder_path}"]
paths=[f"/Folders/{looker_dashboard.folder_path}"]
)
dashboard_snapshot.aspects.append(browse_path)

Expand Down Expand Up @@ -1084,10 +1104,12 @@ def process_dashboard(
looker_dashboard = self._get_looker_dashboard(dashboard_object, self.looker_api)
mces = self._make_dashboard_and_chart_mces(looker_dashboard)
workunits = [
MetadataWorkUnit(id=f"looker-{mce.proposedSnapshot.urn}", mce=mce)
if isinstance(mce, MetadataChangeEvent)
else MetadataWorkUnit(
id=f"looker-{mce.aspectName}-{mce.entityUrn}", mcp=mce
(
MetadataWorkUnit(id=f"looker-{mce.proposedSnapshot.urn}", mce=mce)
if isinstance(mce, MetadataChangeEvent)
else MetadataWorkUnit(
id=f"looker-{mce.aspectName}-{mce.entityUrn}", mcp=mce
)
)
for mce in mces
]
Expand Down Expand Up @@ -1177,12 +1199,7 @@ def extract_independent_looks(self) -> Iterable[MetadataWorkUnit]:
self.reporter.report_stage_start("extract_independent_looks")

logger.debug("Extracting looks not part of Dashboard")
look_fields: List[str] = [
"id",
"title",
"description",
"query_id",
]
look_fields: List[str] = ["id", "title", "description", "query_id", "folder"]
query_fields: List[str] = [
"id",
"view",
Expand Down Expand Up @@ -1227,8 +1244,8 @@ def extract_independent_looks(self) -> Iterable[MetadataWorkUnit]:
subtitle_text=look.description,
look_id=look.id,
dashboard_id=None, # As this is independent look
look=LookWithQuery(query=query),
)
look=LookWithQuery(query=query, folder=look.folder),
),
)

if dashboard_element is not None:
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/sql/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ def _get_column_type(

args = [array_type]

elif type_name in ["struct", "record"]:
# STRUCT is not part of the SQLalchemy types selection
elif type_name in ["struct", "record", "row"]:
# STRUCT and ROW are not part of the SQLalchemy types selection
# but is provided by another official SQLalchemy library and
# compatible with the other SQLalchemy types
detected_col_type = STRUCT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ def resolve_vertica_modified_type(type_string: str) -> Any:
"struct": RecordType,
"map": MapType,
"array": ArrayType,
"row": RecordType,
}

# https://www.vertica.com/docs/11.1.x/HTML/Content/Authoring/SQLReferenceManual/DataTypes/SQLDataTypes.htm
Expand Down
Loading

0 comments on commit 2820b98

Please sign in to comment.