Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Dec 18, 2024
2 parents 0c17eec + 2285436 commit 465ee84
Show file tree
Hide file tree
Showing 12 changed files with 434 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,18 @@

from datahub.configuration.common import ConfigModel
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.global_context import get_graph_context, set_graph_context
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
PropertyValueClass,
StructuredPropertyDefinitionClass,
)
from datahub.utilities.urns.urn import Urn
from datahub.metadata.urns import StructuredPropertyUrn, Urn
from datahub.utilities.urns._urn_base import URN_TYPES

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class StructuredPropertiesConfig:
"""Configuration class to hold the graph client"""

@classmethod
def get_graph_required(cls) -> DataHubGraph:
"""Get the current graph, falling back to default if none set"""
return get_graph_context() or get_default_graph()


class AllowedTypes(Enum):
STRING = "string"
RICH_TEXT = "rich_text"
Expand All @@ -51,29 +42,28 @@ class AllowedValue(ConfigModel):
description: Optional[str] = None


VALID_ENTITY_TYPES_PREFIX_STRING = ", ".join(
[
f"urn:li:entityType:datahub.{x}"
for x in ["dataset", "dashboard", "dataFlow", "schemaField"]
]
)
VALID_ENTITY_TYPES_STRING = f"Valid entity type urns are {VALID_ENTITY_TYPES_PREFIX_STRING}, etc... Ensure that the entity type is valid."
VALID_ENTITY_TYPE_URNS = [
Urn.make_entity_type_urn(entity_type) for entity_type in URN_TYPES.keys()
]
_VALID_ENTITY_TYPES_STRING = f"Valid entity type urns are {', '.join(VALID_ENTITY_TYPE_URNS)}, etc... Ensure that the entity type is valid."


def _validate_entity_type_urn(v: str) -> str:
urn = Urn.make_entity_type_urn(v)
if urn not in VALID_ENTITY_TYPE_URNS:
raise ValueError(
f"Input {v} is not a valid entity type urn. {_VALID_ENTITY_TYPES_STRING}"
)
v = str(urn)
return v


class TypeQualifierAllowedTypes(ConfigModel):
allowed_types: List[str]

@validator("allowed_types", each_item=True)
def validate_allowed_types(cls, v):
if v:
graph = StructuredPropertiesConfig.get_graph_required()
validated_urn = Urn.make_entity_type_urn(v)
if not graph.exists(validated_urn):
raise ValueError(
f"Input {v} is not a valid entity type urn. {VALID_ENTITY_TYPES_STRING}"
)
v = str(validated_urn)
return v
_check_allowed_types = validator("allowed_types", each_item=True, allow_reuse=True)(
_validate_entity_type_urn
)


class StructuredProperties(ConfigModel):
Expand All @@ -90,22 +80,30 @@ class StructuredProperties(ConfigModel):
type_qualifier: Optional[TypeQualifierAllowedTypes] = None
immutable: Optional[bool] = False

@validator("entity_types", each_item=True)
def validate_entity_types(cls, v):
if v:
graph = StructuredPropertiesConfig.get_graph_required()
validated_urn = Urn.make_entity_type_urn(v)
if not graph.exists(validated_urn):
raise ValueError(
f"Input {v} is not a valid entity type urn. {VALID_ENTITY_TYPES_STRING}"
)
v = str(validated_urn)
_check_entity_types = validator("entity_types", each_item=True, allow_reuse=True)(
_validate_entity_type_urn
)

@validator("type")
def validate_type(cls, v: str) -> str:
# Convert to lowercase if needed
if not v.islower():
logger.warning(
f"Structured property type should be lowercase. Updated to {v.lower()}"
)
v = v.lower()

# Check if type is allowed
if not AllowedTypes.check_allowed_type(v):
raise ValueError(
f"Type {v} is not allowed. Allowed types are {AllowedTypes.values()}"
)
return v

@property
def fqn(self) -> str:
assert self.urn is not None
id = Urn.create_from_string(self.urn).get_entity_id()[0]
id = StructuredPropertyUrn.from_string(self.urn).id
if self.qualified_name is not None:
# ensure that qualified name and ID match
assert (
Expand All @@ -122,101 +120,90 @@ def urn_must_be_present(cls, v, values):
return v

@staticmethod
def create(file: str, graph: Optional[DataHubGraph] = None) -> None:
with set_graph_context(graph):
graph = StructuredPropertiesConfig.get_graph_required()

with open(file) as fp:
structuredproperties: List[dict] = yaml.safe_load(fp)
for structuredproperty_raw in structuredproperties:
structuredproperty = StructuredProperties.parse_obj(
structuredproperty_raw
)

if not structuredproperty.type.islower():
structuredproperty.type = structuredproperty.type.lower()
logger.warning(
f"Structured property type should be lowercase. Updated to {structuredproperty.type}"
)
if not AllowedTypes.check_allowed_type(structuredproperty.type):
raise ValueError(
f"Type {structuredproperty.type} is not allowed. Allowed types are {AllowedTypes.values()}"
)
mcp = MetadataChangeProposalWrapper(
entityUrn=structuredproperty.urn,
aspect=StructuredPropertyDefinitionClass(
qualifiedName=structuredproperty.fqn,
valueType=Urn.make_data_type_urn(structuredproperty.type),
displayName=structuredproperty.display_name,
description=structuredproperty.description,
entityTypes=[
Urn.make_entity_type_urn(entity_type)
for entity_type in structuredproperty.entity_types or []
],
cardinality=structuredproperty.cardinality,
immutable=structuredproperty.immutable,
allowedValues=(
[
PropertyValueClass(
value=v.value, description=v.description
)
for v in structuredproperty.allowed_values
]
if structuredproperty.allowed_values
else None
),
typeQualifier=(
{
"allowedTypes": structuredproperty.type_qualifier.allowed_types
}
if structuredproperty.type_qualifier
else None
),
),
)
graph.emit_mcp(mcp)

logger.info(f"Created structured property {structuredproperty.urn}")

@classmethod
def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties":
with set_graph_context(graph):
structured_property: Optional[
StructuredPropertyDefinitionClass
] = graph.get_aspect(urn, StructuredPropertyDefinitionClass)
if structured_property is None:
raise Exception(
"StructuredPropertyDefinition aspect is None. Unable to create structured property."
)
return StructuredProperties(
urn=urn,
qualified_name=structured_property.qualifiedName,
display_name=structured_property.displayName,
type=structured_property.valueType,
description=structured_property.description,
entity_types=structured_property.entityTypes,
cardinality=structured_property.cardinality,
allowed_values=(
def from_yaml(file: str) -> List["StructuredProperties"]:
with open(file) as fp:
structuredproperties: List[dict] = yaml.safe_load(fp)

result: List[StructuredProperties] = []
for structuredproperty_raw in structuredproperties:
result.append(StructuredProperties.parse_obj(structuredproperty_raw))
return result

def generate_mcps(self) -> List[MetadataChangeProposalWrapper]:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=StructuredPropertyDefinitionClass(
qualifiedName=self.fqn,
valueType=Urn.make_data_type_urn(self.type),
displayName=self.display_name,
description=self.description,
entityTypes=[
Urn.make_entity_type_urn(entity_type)
for entity_type in self.entity_types or []
],
cardinality=self.cardinality,
immutable=self.immutable,
allowedValues=(
[
AllowedValue(
value=av.value,
description=av.description,
)
for av in structured_property.allowedValues or []
PropertyValueClass(value=v.value, description=v.description)
for v in self.allowed_values
]
if structured_property.allowedValues is not None
if self.allowed_values
else None
),
type_qualifier=(
{
"allowed_types": structured_property.typeQualifier.get(
"allowedTypes"
)
}
if structured_property.typeQualifier
typeQualifier=(
{"allowedTypes": self.type_qualifier.allowed_types}
if self.type_qualifier
else None
),
),
)
return [mcp]

@staticmethod
def create(file: str, graph: DataHubGraph) -> None:
# TODO: Deprecate this method.
structuredproperties = StructuredProperties.from_yaml(file)
for structuredproperty in structuredproperties:
for mcp in structuredproperty.generate_mcps():
graph.emit_mcp(mcp)

logger.info(f"Created structured property {structuredproperty.urn}")

@classmethod
def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties":
structured_property: Optional[
StructuredPropertyDefinitionClass
] = graph.get_aspect(urn, StructuredPropertyDefinitionClass)
if structured_property is None:
raise Exception(
"StructuredPropertyDefinition aspect is None. Unable to create structured property."
)
return StructuredProperties(
urn=urn,
qualified_name=structured_property.qualifiedName,
display_name=structured_property.displayName,
type=structured_property.valueType,
description=structured_property.description,
entity_types=structured_property.entityTypes,
cardinality=structured_property.cardinality,
allowed_values=(
[
AllowedValue(
value=av.value,
description=av.description,
)
for av in structured_property.allowedValues or []
]
if structured_property.allowedValues is not None
else None
),
type_qualifier=(
{"allowed_types": structured_property.typeQualifier.get("allowedTypes")}
if structured_property.typeQualifier
else None
),
)

def to_yaml(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def properties() -> None:
def upsert(file: Path) -> None:
"""Upsert structured properties in DataHub."""

StructuredProperties.create(str(file))
with get_default_graph() as graph:
StructuredProperties.create(str(file), graph)


@properties.command(
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ def infos(self) -> LossyList[StructuredLogEntry]:

@dataclass
class SourceReport(Report):
event_not_produced_warn: bool = True
events_produced: int = 0
events_produced_per_sec: int = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def auto_workunit_reporter(report: "SourceReport", stream: Iterable[T]) -> Itera
report.report_workunit(wu)
yield wu

if report.events_produced == 0:
if report.event_not_produced_warn and report.events_produced == 0:
report.warning(
title="No metadata was produced by the source",
message="Please check the source configuration, filters, and permissions.",
Expand Down
Loading

0 comments on commit 465ee84

Please sign in to comment.