Skip to content

Commit

Permalink
feat(ingestion/bigquery): BigQuery Owner Label to Datahub Ownership (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamjagtap639 authored Mar 28, 2024
1 parent 32a2de4 commit 9f2c5d3
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 32 deletions.
60 changes: 53 additions & 7 deletions metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,70 @@ The below table shows transformer which can transform aspects of entity [Dataset
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|----------|---------|---------------|---------------------------------------------|
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `tag_prefix` | | str | | Regex to use for tags to match against. Supports Regex to match a prefix which is used to remove content. Rest of string is considered owner ID for creating owner URN. |
| `is_user` | | bool | `true` | Whether should be consider a user or not. If `false` then considered a group. |
| `tag_pattern` | | str | | Regex to use for tags to match against. Supports Regex to match a pattern which is used to remove content. Rest of string is considered owner ID for creating owner URN. |
| `is_user` | | bool | `true` | Whether should be consider a user or not. If `false` then considered a group. |
| `owner_character_mapping` | | dict[str, str] | | A mapping of extracted owner character to datahub owner character. |
| `email_domain` | | str | | If set then this is appended to create owner URN. |
| `extract_owner_type_from_tag_pattern` | | str | `false` | Whether to extract an owner type from provided tag pattern first group. If `true`, no need to provide owner_type and owner_type_urn config. For example: if provided tag pattern is `(.*)_owner_email:` and actual tag is `developer_owner_email`, then extracted owner type will be `developer`.|
| `owner_type` | | str | `TECHNICAL_OWNER` | Ownership type. |
| `owner_type_urn` | | str | `None` | Set to a custom ownership type's URN if using custom ownership. |

Matches against a tag prefix and considers string in tags after that prefix as owner to create ownership.
Let’s suppose we’d like to add a dataset ownerships based on part of dataset tags. To do so, we can use the `extract_ownership_from_tags` transformer that’s included in the ingestion framework.

The config, which we’d append to our ingestion recipe YAML, would look like this:

```yaml
transformers:
- type: "extract_ownership_from_tags"
config:
tag_prefix: "dbt:techno-genie:"
is_user: true
email_domain: "coolcompany.com"
tag_pattern: "owner_email:"
```
So if we have input dataset tag like
- `urn:li:tag:dataset_owner_email:[email protected]`
- `urn:li:tag:dataset_owner_email:[email protected]`

The portion of the tag after the matched tag pattern will be converted into an owner. Hence users `[email protected]` and `[email protected]` will be added as owners.

### Examples

- Add owners, however owner should be considered as group and also email domain not provided in tag string. For example: from tag urn `urn:li:tag:dataset_owner:abc` extracted owner urn should be `urn:li:corpGroup:[email protected]` then config would look like this:
```yaml
transformers:
- type: "extract_ownership_from_tags"
config:
tag_pattern: "owner:"
is_user: false
email_domain: "email.com"
```
- Add owners, however owner type and owner type urn wanted to provide externally. For example: from tag urn `urn:li:tag:dataset_owner_email:[email protected]` owner type should be `CUSTOM` and owner type urn as `"urn:li:ownershipType:data_product"` then config would look like this:
```yaml
transformers:
- type: "extract_ownership_from_tags"
config:
tag_pattern: "owner_email:"
owner_type: "CUSTOM"
owner_type_urn: "urn:li:ownershipType:data_product"
```
- Add owners, however some owner characters needs to replace with some other characters before ingestion. For example: from tag urn `urn:li:tag:dataset_owner_email:abc_xyz-email_com` extracted owner urn should be `urn:li:corpGroup:[email protected]` then config would look like this:
```yaml
transformers:
- type: "extract_ownership_from_tags"
config:
tag_pattern: "owner_email:"
owner_character_mapping:
"_": ".",
"-": "@",
```
- Add owners, however owner type also need to extracted from tag pattern. For example: from tag urn `urn:li:tag:data_producer_owner_email:[email protected]` extracted owner type should be `data_producer` then config would look like this:
```yaml
transformers:
- type: "extract_ownership_from_tags"
config:
tag_pattern: "(.*)_owner_email:"
extract_owner_type_from_tag_pattern: true
```

## Clean suffix prefix from Ownership
### Config Details
| Field | Required | Type | Default | Description |
Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ def make_owner_urn(owner: str, owner_type: OwnerType) -> str:
return f"urn:li:{owner_type.value}:{owner}"


def make_ownership_type_urn(type: str) -> str:
return f"urn:li:ownershipType:{type}"


def make_term_urn(term: str) -> str:
"""
Makes a term urn if the input is not a term urn already
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
import re
from functools import lru_cache
from typing import List, Optional, Sequence, Union, cast
from typing import Dict, List, Optional, Sequence, Union, cast

from datahub.configuration.common import TransformerSemanticsConfigModel
from datahub.emitter.mce_builder import Aspect
from datahub.configuration.common import ConfigModel
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.emitter.mce_builder import Aspect, make_ownership_type_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import DatasetTagsTransformer
Expand All @@ -22,13 +23,19 @@
logger = logging.getLogger(__name__)


class ExtractOwnersFromTagsConfig(TransformerSemanticsConfigModel):
tag_prefix: str
class ExtractOwnersFromTagsConfig(ConfigModel):
tag_pattern: str = ""
is_user: bool = True
owner_character_mapping: Optional[Dict[str, str]] = None
email_domain: Optional[str] = None
extract_owner_type_from_tag_pattern: bool = False
owner_type: str = "TECHNICAL_OWNER"
owner_type_urn: Optional[str] = None

_rename_tag_prefix_to_tag_pattern = pydantic_renamed_field(
"tag_prefix", "tag_pattern"
)


@lru_cache(maxsize=10)
def get_owner_type(owner_type_str: str) -> str:
Expand Down Expand Up @@ -63,6 +70,19 @@ def get_owner_urn(self, owner_str: str) -> str:
return owner_str + "@" + self.config.email_domain
return owner_str

def convert_owner_as_per_mapping(self, owner: str) -> str:
if self.config.owner_character_mapping:
# Sort the provided mapping by its length.
# Eg: Suppose we have {"_":".", "__":"#"} character mapping.
# In this case "__" character should get replace first compare to "_" character.
for key in sorted(
self.config.owner_character_mapping.keys(),
key=len,
reverse=True,
):
owner = owner.replace(key, self.config.owner_character_mapping[key])
return owner

def handle_end_of_stream(
self,
) -> Sequence[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]:
Expand All @@ -79,29 +99,41 @@ def transform_aspect(
owners: List[OwnerClass] = []

for tag_class in tags:
tag_urn = TagUrn.from_string(tag_class.tag)
tag_str = tag_urn.entity_ids[0]
re_match = re.search(self.config.tag_prefix, tag_str)
tag_str = TagUrn.from_string(tag_class.tag).name
re_match = re.search(self.config.tag_pattern, tag_str)
if re_match:
owner_str = tag_str[re_match.end() :].strip()
owner_str = self.convert_owner_as_per_mapping(owner_str)
owner_urn_str = self.get_owner_urn(owner_str)
if self.config.is_user:
owner_urn = str(CorpuserUrn(owner_urn_str))
owner_urn = (
str(CorpuserUrn(owner_urn_str))
if self.config.is_user
else str(CorpGroupUrn(owner_urn_str))
)

if self.config.extract_owner_type_from_tag_pattern:
if re_match.groups():
owners.append(
OwnerClass(
owner=owner_urn,
type=OwnershipTypeClass.CUSTOM,
typeUrn=make_ownership_type_urn(re_match.group(1)),
)
)
else:
owner_urn = str(CorpGroupUrn(owner_urn_str))
owner_type = get_owner_type(self.config.owner_type)
if owner_type == OwnershipTypeClass.CUSTOM:
assert (
self.config.owner_type_urn is not None
), "owner_type_urn must be set if owner_type is CUSTOM"

owners.append(
OwnerClass(
owner=owner_urn,
type=owner_type,
typeUrn=self.config.owner_type_urn,
owner_type = get_owner_type(self.config.owner_type)
if owner_type == OwnershipTypeClass.CUSTOM:
assert (
self.config.owner_type_urn is not None
), "owner_type_urn must be set if owner_type is CUSTOM"

owners.append(
OwnerClass(
owner=owner_urn,
type=owner_type,
typeUrn=self.config.owner_type_urn,
)
)
)

self.owner_mcps.append(
MetadataChangeProposalWrapper(
Expand All @@ -111,5 +143,4 @@ def transform_aspect(
),
)
)

return None
return aspect
101 changes: 101 additions & 0 deletions metadata-ingestion/tests/unit/test_bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,27 @@
BigqueryDataset,
BigqueryProject,
BigQuerySchemaApi,
BigqueryTable,
BigqueryTableSnapshot,
BigqueryView,
)
from datahub.ingestion.source.bigquery_v2.lineage import (
LineageEdge,
LineageEdgeColumnMapping,
)
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.metadata.com.linkedin.pegasus2avro.dataset import ViewProperties
from datahub.metadata.schema_classes import (
ContainerClass,
DataPlatformInstanceClass,
DatasetPropertiesClass,
GlobalTagsClass,
MetadataChangeProposalClass,
SchemaMetadataClass,
StatusClass,
SubTypesClass,
TagAssociationClass,
TimeStampClass,
)


Expand Down Expand Up @@ -352,6 +362,97 @@ def test_get_projects_list_fully_filtered(get_projects_mock, get_bq_client_mock)
assert projects == []


@pytest.fixture
def bigquery_table() -> BigqueryTable:
now = datetime.now(tz=timezone.utc)
return BigqueryTable(
name="table1",
comment="comment1",
created=now,
last_altered=now,
size_in_bytes=2400,
rows_count=2,
expires=now - timedelta(days=10),
labels={"data_producer_owner_email": "games_team-nytimes_com"},
num_partitions=1,
max_partition_id="1",
max_shard_id="1",
active_billable_bytes=2400,
long_term_billable_bytes=2400,
)


@patch.object(BigQueryV2Config, "get_bigquery_client")
def test_gen_table_dataset_workunits(get_bq_client_mock, bigquery_table):
project_id = "test-project"
dataset_name = "test-dataset"
config = BigQueryV2Config.parse_obj(
{
"project_id": project_id,
"capture_table_label_as_tag": True,
}
)
source: BigqueryV2Source = BigqueryV2Source(
config=config, ctx=PipelineContext(run_id="test")
)

gen = source.gen_table_dataset_workunits(
bigquery_table, [], project_id, dataset_name
)
mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata)
assert mcp.aspect == StatusClass(removed=False)

mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata)
assert isinstance(mcp.aspect, SchemaMetadataClass)
assert mcp.aspect.schemaName == f"{project_id}.{dataset_name}.{bigquery_table.name}"
assert mcp.aspect.fields == []

mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata)
assert isinstance(mcp.aspect, DatasetPropertiesClass)
assert mcp.aspect.name == bigquery_table.name
assert (
mcp.aspect.qualifiedName == f"{project_id}.{dataset_name}.{bigquery_table.name}"
)
assert mcp.aspect.description == bigquery_table.comment
assert mcp.aspect.created == TimeStampClass(
time=int(bigquery_table.created.timestamp() * 1000)
)
assert mcp.aspect.lastModified == TimeStampClass(
time=int(bigquery_table.last_altered.timestamp() * 1000)
)
assert mcp.aspect.tags == []

assert mcp.aspect.customProperties == {
"expiration_date": str(bigquery_table.expires),
"size_in_bytes": str(bigquery_table.size_in_bytes),
"billable_bytes_active": str(bigquery_table.active_billable_bytes),
"billable_bytes_long_term": str(bigquery_table.long_term_billable_bytes),
"number_of_partitions": str(bigquery_table.num_partitions),
"max_partition_id": str(bigquery_table.max_partition_id),
"is_partitioned": "True",
"max_shard_id": str(bigquery_table.max_shard_id),
"is_sharded": "True",
}

mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata)
assert isinstance(mcp.aspect, GlobalTagsClass)
assert mcp.aspect.tags == [
TagAssociationClass(
"urn:li:tag:data_producer_owner_email:games_team-nytimes_com"
)
]

mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata)
assert isinstance(mcp.aspect, ContainerClass)

mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata)
assert isinstance(mcp.aspect, DataPlatformInstanceClass)

mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata)
assert isinstance(mcp.aspect, SubTypesClass)
assert mcp.aspect.typeNames[1] == DatasetSubTypes.TABLE


@patch.object(BigQueryV2Config, "get_bigquery_client")
def test_simple_upstream_table_generation(get_bq_client_mock):
a: BigQueryTableRef = BigQueryTableRef(
Expand Down
22 changes: 22 additions & 0 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ def _test_owner(
config: Dict,
expected_owner: str,
expected_owner_type: Optional[str] = None,
expected_owner_type_urn: Optional[str] = None,
) -> None:
dataset = make_generic_dataset(
aspects=[
Expand Down Expand Up @@ -682,6 +683,8 @@ def _test_owner(

assert owner.owner == expected_owner

assert owner.typeUrn == expected_owner_type_urn

_test_owner(
tag="owner:foo",
config={
Expand Down Expand Up @@ -736,6 +739,25 @@ def _test_owner(
},
expected_owner="urn:li:corpuser:[email protected]",
expected_owner_type=OwnershipTypeClass.CUSTOM,
expected_owner_type_urn="urn:li:ownershipType:ad8557d6-dcb9-4d2a-83fc-b7d0d54f3e0f",
)
_test_owner(
tag="data_producer_owner_email:abc_xyz-email_com",
config={
"tag_pattern": "(.*)_owner_email:",
"owner_character_mapping": {
"_": ".",
"-": "@",
"__": "_",
"--": "-",
"_-": "#",
"-_": " ",
},
"extract_owner_type_from_tag_pattern": True,
},
expected_owner="urn:li:corpuser:[email protected]",
expected_owner_type=OwnershipTypeClass.CUSTOM,
expected_owner_type_urn="urn:li:ownershipType:data_producer",
)


Expand Down

0 comments on commit 9f2c5d3

Please sign in to comment.