From 5946558c01c0b3f99effe8fd7fd11ba30c892a1f Mon Sep 17 00:00:00 2001 From: Alice-sky <1835063592@qq.com> Date: Wed, 18 Dec 2024 15:21:41 +0800 Subject: [PATCH 1/3] fix(ingest/pulsar): handle Avro schema with missing namespace or name (#12058) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Alice Co-authored-by: Shirshanka Das Co-authored-by: Sergio Gómez Villamor Co-authored-by: Harshal Sheth --- .../src/datahub/ingestion/source/pulsar.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py index 15ee995b2d5fd..f71949b9eb27f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py +++ b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py @@ -89,7 +89,16 @@ def __init__(self, schema): logger.error(f"Invalid JSON schema: {schema_data}. Error: {str(e)}") avro_schema = {} - self.schema_name = avro_schema.get("namespace") + "." + avro_schema.get("name") + self.schema_name = "null" + if avro_schema.get("namespace") and avro_schema.get("name"): + self.schema_name = ( + avro_schema.get("namespace") + "." + avro_schema.get("name") + ) + elif avro_schema.get("namespace"): + self.schema_name = avro_schema.get("namespace") + elif avro_schema.get("name"): + self.schema_name = avro_schema.get("name") + self.schema_description = avro_schema.get("doc") self.schema_type = schema.get("type") self.schema_str = schema.get("data") From 76cfac3700f261dd87d0c494235ea8c1635bd7ec Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 18 Dec 2024 04:04:51 -0500 Subject: [PATCH 2/3] fix(cli/properties): allow structured properties without a graph instance (#12144) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Sergio Gómez Villamor --- .../structuredproperties.py | 245 +++++++++--------- .../cli/specific/structuredproperties_cli.py | 3 +- .../entities/structuredproperties/__init__.py | 0 .../example_structured_properties_golden.json | 194 ++++++++++++++ .../test_structuredproperties.py | 38 +++ .../tests/unit/serde/test_codegen.py | 7 + 6 files changed, 357 insertions(+), 130 deletions(-) create mode 100644 metadata-ingestion/tests/unit/api/entities/structuredproperties/__init__.py create mode 100644 metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json create mode 100644 metadata-ingestion/tests/unit/api/entities/structuredproperties/test_structuredproperties.py diff --git a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py index fd3fe7ca098ec..e37281dea86e1 100644 --- a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py +++ b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py @@ -9,27 +9,18 @@ from datahub.configuration.common import ConfigModel from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.ingestion.api.global_context import get_graph_context, set_graph_context -from datahub.ingestion.graph.client import DataHubGraph, get_default_graph +from datahub.ingestion.graph.client import DataHubGraph from datahub.metadata.schema_classes import ( PropertyValueClass, StructuredPropertyDefinitionClass, ) -from datahub.utilities.urns.urn import Urn +from datahub.metadata.urns import StructuredPropertyUrn, Urn +from datahub.utilities.urns._urn_base import URN_TYPES logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -class StructuredPropertiesConfig: - """Configuration class to hold the graph client""" - - @classmethod - def get_graph_required(cls) -> DataHubGraph: - """Get the current graph, falling back to default if none set""" - return get_graph_context() or get_default_graph() - - class AllowedTypes(Enum): STRING = "string" RICH_TEXT = "rich_text" @@ -51,29 +42,28 @@ class AllowedValue(ConfigModel): description: Optional[str] = None -VALID_ENTITY_TYPES_PREFIX_STRING = ", ".join( - [ - f"urn:li:entityType:datahub.{x}" - for x in ["dataset", "dashboard", "dataFlow", "schemaField"] - ] -) -VALID_ENTITY_TYPES_STRING = f"Valid entity type urns are {VALID_ENTITY_TYPES_PREFIX_STRING}, etc... Ensure that the entity type is valid." +VALID_ENTITY_TYPE_URNS = [ + Urn.make_entity_type_urn(entity_type) for entity_type in URN_TYPES.keys() +] +_VALID_ENTITY_TYPES_STRING = f"Valid entity type urns are {', '.join(VALID_ENTITY_TYPE_URNS)}, etc... Ensure that the entity type is valid." + + +def _validate_entity_type_urn(v: str) -> str: + urn = Urn.make_entity_type_urn(v) + if urn not in VALID_ENTITY_TYPE_URNS: + raise ValueError( + f"Input {v} is not a valid entity type urn. {_VALID_ENTITY_TYPES_STRING}" + ) + v = str(urn) + return v class TypeQualifierAllowedTypes(ConfigModel): allowed_types: List[str] - @validator("allowed_types", each_item=True) - def validate_allowed_types(cls, v): - if v: - graph = StructuredPropertiesConfig.get_graph_required() - validated_urn = Urn.make_entity_type_urn(v) - if not graph.exists(validated_urn): - raise ValueError( - f"Input {v} is not a valid entity type urn. {VALID_ENTITY_TYPES_STRING}" - ) - v = str(validated_urn) - return v + _check_allowed_types = validator("allowed_types", each_item=True, allow_reuse=True)( + _validate_entity_type_urn + ) class StructuredProperties(ConfigModel): @@ -90,22 +80,30 @@ class StructuredProperties(ConfigModel): type_qualifier: Optional[TypeQualifierAllowedTypes] = None immutable: Optional[bool] = False - @validator("entity_types", each_item=True) - def validate_entity_types(cls, v): - if v: - graph = StructuredPropertiesConfig.get_graph_required() - validated_urn = Urn.make_entity_type_urn(v) - if not graph.exists(validated_urn): - raise ValueError( - f"Input {v} is not a valid entity type urn. {VALID_ENTITY_TYPES_STRING}" - ) - v = str(validated_urn) + _check_entity_types = validator("entity_types", each_item=True, allow_reuse=True)( + _validate_entity_type_urn + ) + + @validator("type") + def validate_type(cls, v: str) -> str: + # Convert to lowercase if needed + if not v.islower(): + logger.warning( + f"Structured property type should be lowercase. Updated to {v.lower()}" + ) + v = v.lower() + + # Check if type is allowed + if not AllowedTypes.check_allowed_type(v): + raise ValueError( + f"Type {v} is not allowed. Allowed types are {AllowedTypes.values()}" + ) return v @property def fqn(self) -> str: assert self.urn is not None - id = Urn.create_from_string(self.urn).get_entity_id()[0] + id = StructuredPropertyUrn.from_string(self.urn).id if self.qualified_name is not None: # ensure that qualified name and ID match assert ( @@ -122,101 +120,90 @@ def urn_must_be_present(cls, v, values): return v @staticmethod - def create(file: str, graph: Optional[DataHubGraph] = None) -> None: - with set_graph_context(graph): - graph = StructuredPropertiesConfig.get_graph_required() - - with open(file) as fp: - structuredproperties: List[dict] = yaml.safe_load(fp) - for structuredproperty_raw in structuredproperties: - structuredproperty = StructuredProperties.parse_obj( - structuredproperty_raw - ) - - if not structuredproperty.type.islower(): - structuredproperty.type = structuredproperty.type.lower() - logger.warning( - f"Structured property type should be lowercase. Updated to {structuredproperty.type}" - ) - if not AllowedTypes.check_allowed_type(structuredproperty.type): - raise ValueError( - f"Type {structuredproperty.type} is not allowed. Allowed types are {AllowedTypes.values()}" - ) - mcp = MetadataChangeProposalWrapper( - entityUrn=structuredproperty.urn, - aspect=StructuredPropertyDefinitionClass( - qualifiedName=structuredproperty.fqn, - valueType=Urn.make_data_type_urn(structuredproperty.type), - displayName=structuredproperty.display_name, - description=structuredproperty.description, - entityTypes=[ - Urn.make_entity_type_urn(entity_type) - for entity_type in structuredproperty.entity_types or [] - ], - cardinality=structuredproperty.cardinality, - immutable=structuredproperty.immutable, - allowedValues=( - [ - PropertyValueClass( - value=v.value, description=v.description - ) - for v in structuredproperty.allowed_values - ] - if structuredproperty.allowed_values - else None - ), - typeQualifier=( - { - "allowedTypes": structuredproperty.type_qualifier.allowed_types - } - if structuredproperty.type_qualifier - else None - ), - ), - ) - graph.emit_mcp(mcp) - - logger.info(f"Created structured property {structuredproperty.urn}") - - @classmethod - def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties": - with set_graph_context(graph): - structured_property: Optional[ - StructuredPropertyDefinitionClass - ] = graph.get_aspect(urn, StructuredPropertyDefinitionClass) - if structured_property is None: - raise Exception( - "StructuredPropertyDefinition aspect is None. Unable to create structured property." - ) - return StructuredProperties( - urn=urn, - qualified_name=structured_property.qualifiedName, - display_name=structured_property.displayName, - type=structured_property.valueType, - description=structured_property.description, - entity_types=structured_property.entityTypes, - cardinality=structured_property.cardinality, - allowed_values=( + def from_yaml(file: str) -> List["StructuredProperties"]: + with open(file) as fp: + structuredproperties: List[dict] = yaml.safe_load(fp) + + result: List[StructuredProperties] = [] + for structuredproperty_raw in structuredproperties: + result.append(StructuredProperties.parse_obj(structuredproperty_raw)) + return result + + def generate_mcps(self) -> List[MetadataChangeProposalWrapper]: + mcp = MetadataChangeProposalWrapper( + entityUrn=self.urn, + aspect=StructuredPropertyDefinitionClass( + qualifiedName=self.fqn, + valueType=Urn.make_data_type_urn(self.type), + displayName=self.display_name, + description=self.description, + entityTypes=[ + Urn.make_entity_type_urn(entity_type) + for entity_type in self.entity_types or [] + ], + cardinality=self.cardinality, + immutable=self.immutable, + allowedValues=( [ - AllowedValue( - value=av.value, - description=av.description, - ) - for av in structured_property.allowedValues or [] + PropertyValueClass(value=v.value, description=v.description) + for v in self.allowed_values ] - if structured_property.allowedValues is not None + if self.allowed_values else None ), - type_qualifier=( - { - "allowed_types": structured_property.typeQualifier.get( - "allowedTypes" - ) - } - if structured_property.typeQualifier + typeQualifier=( + {"allowedTypes": self.type_qualifier.allowed_types} + if self.type_qualifier else None ), + ), + ) + return [mcp] + + @staticmethod + def create(file: str, graph: DataHubGraph) -> None: + # TODO: Deprecate this method. + structuredproperties = StructuredProperties.from_yaml(file) + for structuredproperty in structuredproperties: + for mcp in structuredproperty.generate_mcps(): + graph.emit_mcp(mcp) + + logger.info(f"Created structured property {structuredproperty.urn}") + + @classmethod + def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties": + structured_property: Optional[ + StructuredPropertyDefinitionClass + ] = graph.get_aspect(urn, StructuredPropertyDefinitionClass) + if structured_property is None: + raise Exception( + "StructuredPropertyDefinition aspect is None. Unable to create structured property." ) + return StructuredProperties( + urn=urn, + qualified_name=structured_property.qualifiedName, + display_name=structured_property.displayName, + type=structured_property.valueType, + description=structured_property.description, + entity_types=structured_property.entityTypes, + cardinality=structured_property.cardinality, + allowed_values=( + [ + AllowedValue( + value=av.value, + description=av.description, + ) + for av in structured_property.allowedValues or [] + ] + if structured_property.allowedValues is not None + else None + ), + type_qualifier=( + {"allowed_types": structured_property.typeQualifier.get("allowedTypes")} + if structured_property.typeQualifier + else None + ), + ) def to_yaml( self, diff --git a/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py b/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py index 4162d44b9b0ea..42285cf13a5dd 100644 --- a/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py +++ b/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py @@ -31,7 +31,8 @@ def properties() -> None: def upsert(file: Path) -> None: """Upsert structured properties in DataHub.""" - StructuredProperties.create(str(file)) + with get_default_graph() as graph: + StructuredProperties.create(str(file), graph) @properties.command( diff --git a/metadata-ingestion/tests/unit/api/entities/structuredproperties/__init__.py b/metadata-ingestion/tests/unit/api/entities/structuredproperties/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json b/metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json new file mode 100644 index 0000000000000..29386ece7b0ca --- /dev/null +++ b/metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json @@ -0,0 +1,194 @@ +[ +{ + "entityType": "structuredProperty", + "entityUrn": "urn:li:structuredProperty:io.acryl.privacy.retentionTime", + "changeType": "UPSERT", + "aspectName": "propertyDefinition", + "aspect": { + "json": { + "qualifiedName": "io.acryl.privacy.retentionTime", + "displayName": "Retention Time", + "valueType": "urn:li:dataType:datahub.number", + "allowedValues": [ + { + "value": { + "string": "30" + }, + "description": "30 days, usually reserved for datasets that are ephemeral and contain pii" + }, + { + "value": { + "string": "90" + }, + "description": "Use this for datasets that drive monthly reporting but contain pii" + }, + { + "value": { + "string": "365" + }, + "description": "Use this for non-sensitive data that can be retained for longer" + } + ], + "cardinality": "MULTIPLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset", + "urn:li:entityType:datahub.dataFlow" + ], + "description": "Retention Time is used to figure out how long to retain records in a dataset", + "immutable": false + } + } +}, +{ + "entityType": "structuredProperty", + "entityUrn": "urn:li:structuredProperty:io.acryl.dataManagement.replicationSLA", + "changeType": "UPSERT", + "aspectName": "propertyDefinition", + "aspect": { + "json": { + "qualifiedName": "io.acryl.dataManagement.replicationSLA", + "displayName": "Replication SLA", + "valueType": "urn:li:dataType:datahub.number", + "cardinality": "SINGLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset" + ], + "description": "SLA for how long data can be delayed before replicating to the destination cluster", + "immutable": false + } + } +}, +{ + "entityType": "structuredProperty", + "entityUrn": "urn:li:structuredProperty:io.acryl.dataManagement.deprecationDate", + "changeType": "UPSERT", + "aspectName": "propertyDefinition", + "aspect": { + "json": { + "qualifiedName": "io.acryl.dataManagement.deprecationDate", + "displayName": "Deprecation Date", + "valueType": "urn:li:dataType:datahub.date", + "cardinality": "SINGLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset", + "urn:li:entityType:datahub.dataFlow", + "urn:li:entityType:datahub.dataJob" + ], + "immutable": false + } + } +}, +{ + "entityType": "structuredProperty", + "entityUrn": "urn:li:structuredProperty:io.acryl.dataManagement.steward", + "changeType": "UPSERT", + "aspectName": "propertyDefinition", + "aspect": { + "json": { + "qualifiedName": "io.acryl.dataManagement.steward", + "displayName": "Steward", + "valueType": "urn:li:dataType:datahub.urn", + "typeQualifier": { + "allowedTypes": [ + "urn:li:entityType:datahub.corpuser", + "urn:li:entityType:datahub.corpGroup" + ] + }, + "cardinality": "SINGLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset", + "urn:li:entityType:datahub.dataFlow", + "urn:li:entityType:datahub.dataJob" + ], + "immutable": false + } + } +}, +{ + "entityType": "structuredProperty", + "entityUrn": "urn:li:structuredProperty:io.acryl.dataManagement.certifier", + "changeType": "UPSERT", + "aspectName": "propertyDefinition", + "aspect": { + "json": { + "qualifiedName": "io.acryl.dataManagement.certifier", + "displayName": "Person Certifying the asset", + "valueType": "urn:li:dataType:datahub.urn", + "cardinality": "SINGLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset", + "urn:li:entityType:datahub.schemaField" + ], + "immutable": false + } + } +}, +{ + "entityType": "structuredProperty", + "entityUrn": "urn:li:structuredProperty:io.acryl.dataManagement.team", + "changeType": "UPSERT", + "aspectName": "propertyDefinition", + "aspect": { + "json": { + "qualifiedName": "io.acryl.dataManagement.team", + "displayName": "Management team", + "valueType": "urn:li:dataType:datahub.string", + "cardinality": "SINGLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset" + ], + "immutable": false + } + } +}, +{ + "entityType": "structuredProperty", + "entityUrn": "urn:li:structuredProperty:projectNames", + "changeType": "UPSERT", + "aspectName": "propertyDefinition", + "aspect": { + "json": { + "qualifiedName": "projectNames", + "displayName": "Project names", + "valueType": "urn:li:dataType:datahub.string", + "allowedValues": [ + { + "value": { + "string": "Tracking" + }, + "description": "test value 1 for project" + }, + { + "value": { + "string": "DataHub" + }, + "description": "test value 2 for project" + } + ], + "cardinality": "MULTIPLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset" + ], + "immutable": false + } + } +}, +{ + "entityType": "structuredProperty", + "entityUrn": "urn:li:structuredProperty:namespace", + "changeType": "UPSERT", + "aspectName": "propertyDefinition", + "aspect": { + "json": { + "qualifiedName": "namespace", + "displayName": "Namespace", + "valueType": "urn:li:dataType:datahub.string", + "cardinality": "SINGLE", + "entityTypes": [ + "urn:li:entityType:datahub.dataset" + ], + "immutable": false + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/api/entities/structuredproperties/test_structuredproperties.py b/metadata-ingestion/tests/unit/api/entities/structuredproperties/test_structuredproperties.py new file mode 100644 index 0000000000000..e96b7c1f98437 --- /dev/null +++ b/metadata-ingestion/tests/unit/api/entities/structuredproperties/test_structuredproperties.py @@ -0,0 +1,38 @@ +import pathlib + +import pydantic +import pytest + +from datahub.api.entities.structuredproperties.structuredproperties import ( + StructuredProperties, + TypeQualifierAllowedTypes, +) +from tests.test_helpers.mce_helpers import check_goldens_stream + +RESOURCE_DIR = pathlib.Path(__file__).parent + + +def test_type_validation() -> None: + with pytest.raises(pydantic.ValidationError): + TypeQualifierAllowedTypes(allowed_types=["thisdoesnotexist"]) + + types = TypeQualifierAllowedTypes(allowed_types=["dataset"]) + assert types.allowed_types == ["urn:li:entityType:datahub.dataset"] + + +def test_structuredproperties_load(pytestconfig: pytest.Config) -> None: + example_properties_file = ( + pytestconfig.rootpath + / "examples/structured_properties/structured_properties.yaml" + ) + + properties = StructuredProperties.from_yaml(str(example_properties_file)) + mcps = [] + for property in properties: + mcps.extend(property.generate_mcps()) + + check_goldens_stream( + pytestconfig, + mcps, + golden_path=RESOURCE_DIR / "example_structured_properties_golden.json", + ) diff --git a/metadata-ingestion/tests/unit/serde/test_codegen.py b/metadata-ingestion/tests/unit/serde/test_codegen.py index 37ac35586950e..98d62d5643ff2 100644 --- a/metadata-ingestion/tests/unit/serde/test_codegen.py +++ b/metadata-ingestion/tests/unit/serde/test_codegen.py @@ -18,6 +18,7 @@ UpstreamClass, _Aspect, ) +from datahub.utilities.urns._urn_base import URN_TYPES _UPDATE_ENTITY_REGISTRY = os.getenv("UPDATE_ENTITY_REGISTRY", "false").lower() == "true" ENTITY_REGISTRY_PATH = pathlib.Path( @@ -165,3 +166,9 @@ def test_enum_options(): # This is mainly a sanity check to ensure that it doesn't do anything too crazy. env_options = get_enum_options(FabricTypeClass) assert "PROD" in env_options + + +def test_urn_types() -> None: + assert len(URN_TYPES) > 10 + for checked_type in ["dataset", "dashboard", "dataFlow", "schemaField"]: + assert checked_type in URN_TYPES From 2285436a62dcee0ab0c4e4104f5c984c9d8a7b96 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Wed, 18 Dec 2024 17:50:38 +0530 Subject: [PATCH 3/3] fix(ingest/gc): more logging, error handling, explicit flag (#12124) --- .../src/datahub/ingestion/api/source.py | 1 + .../datahub/ingestion/api/source_helpers.py | 2 +- .../datahub/ingestion/source/gc/datahub_gc.py | 54 +++++++++---------- .../source/gc/dataprocess_cleanup.py | 52 ++++++++++++------ .../source/gc/soft_deleted_entity_cleanup.py | 5 ++ 5 files changed, 67 insertions(+), 47 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index c80da04e481a9..c3638635b19aa 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -184,6 +184,7 @@ def infos(self) -> LossyList[StructuredLogEntry]: @dataclass class SourceReport(Report): + event_not_produced_warn: bool = True events_produced: int = 0 events_produced_per_sec: int = 0 diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 0c86e1cf47203..7791ea2797be3 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -150,7 +150,7 @@ def auto_workunit_reporter(report: "SourceReport", stream: Iterable[T]) -> Itera report.report_workunit(wu) yield wu - if report.events_produced == 0: + if report.event_not_produced_warn and report.events_produced == 0: report.warning( title="No metadata was produced by the source", message="Please check the source configuration, filters, and permissions.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py index 814f65ecb45cf..4eecbb4d9d717 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py @@ -65,18 +65,18 @@ class DataHubGcSourceConfig(ConfigModel): description="Sleep between truncation monitoring.", ) - dataprocess_cleanup: Optional[DataProcessCleanupConfig] = Field( - default=None, + dataprocess_cleanup: DataProcessCleanupConfig = Field( + default_factory=DataProcessCleanupConfig, description="Configuration for data process cleanup", ) - soft_deleted_entities_cleanup: Optional[SoftDeletedEntitiesCleanupConfig] = Field( - default=None, + soft_deleted_entities_cleanup: SoftDeletedEntitiesCleanupConfig = Field( + default_factory=SoftDeletedEntitiesCleanupConfig, description="Configuration for soft deleted entities cleanup", ) - execution_request_cleanup: Optional[DatahubExecutionRequestCleanupConfig] = Field( - default=None, + execution_request_cleanup: DatahubExecutionRequestCleanupConfig = Field( + default_factory=DatahubExecutionRequestCleanupConfig, description="Configuration for execution request cleanup", ) @@ -108,28 +108,22 @@ def __init__(self, ctx: PipelineContext, config: DataHubGcSourceConfig): self.ctx = ctx self.config = config self.report = DataHubGcSourceReport() + self.report.event_not_produced_warn = False self.graph = ctx.require_graph("The DataHubGc source") - self.dataprocess_cleanup: Optional[DataProcessCleanup] = None - self.soft_deleted_entities_cleanup: Optional[SoftDeletedEntitiesCleanup] = None - self.execution_request_cleanup: Optional[DatahubExecutionRequestCleanup] = None - - if self.config.dataprocess_cleanup: - self.dataprocess_cleanup = DataProcessCleanup( - ctx, self.config.dataprocess_cleanup, self.report, self.config.dry_run - ) - if self.config.soft_deleted_entities_cleanup: - self.soft_deleted_entities_cleanup = SoftDeletedEntitiesCleanup( - ctx, - self.config.soft_deleted_entities_cleanup, - self.report, - self.config.dry_run, - ) - if self.config.execution_request_cleanup: - self.execution_request_cleanup = DatahubExecutionRequestCleanup( - config=self.config.execution_request_cleanup, - graph=self.graph, - report=self.report, - ) + self.dataprocess_cleanup = DataProcessCleanup( + ctx, self.config.dataprocess_cleanup, self.report, self.config.dry_run + ) + self.soft_deleted_entities_cleanup = SoftDeletedEntitiesCleanup( + ctx, + self.config.soft_deleted_entities_cleanup, + self.report, + self.config.dry_run, + ) + self.execution_request_cleanup = DatahubExecutionRequestCleanup( + config=self.config.execution_request_cleanup, + graph=self.graph, + report=self.report, + ) @classmethod def create(cls, config_dict, ctx): @@ -153,19 +147,19 @@ def get_workunits_internal( self.truncate_indices() except Exception as e: self.report.failure("While trying to truncate indices ", exc=e) - if self.soft_deleted_entities_cleanup: + if self.config.soft_deleted_entities_cleanup.enabled: try: self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities() except Exception as e: self.report.failure( "While trying to cleanup soft deleted entities ", exc=e ) - if self.execution_request_cleanup: + if self.config.execution_request_cleanup.enabled: try: self.execution_request_cleanup.run() except Exception as e: self.report.failure("While trying to cleanup execution request ", exc=e) - if self.dataprocess_cleanup: + if self.config.dataprocess_cleanup.enabled: try: yield from self.dataprocess_cleanup.get_workunits_internal() except Exception as e: diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py index 8aacf13cdb00f..6d16aaab2d798 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py @@ -98,6 +98,9 @@ class DataProcessCleanupConfig(ConfigModel): + enabled: bool = Field( + default=True, description="Whether to do data process cleanup." + ) retention_days: Optional[int] = Field( 10, description="Number of days to retain metadata in DataHub", @@ -371,17 +374,26 @@ def get_data_flows(self) -> Iterable[DataFlowEntity]: previous_scroll_id: Optional[str] = None while True: - result = self.ctx.graph.execute_graphql( - DATAFLOW_QUERY, - { - "query": "*", - "scrollId": scroll_id if scroll_id else None, - "batchSize": self.config.batch_size, - }, - ) + result = None + try: + result = self.ctx.graph.execute_graphql( + DATAFLOW_QUERY, + { + "query": "*", + "scrollId": scroll_id if scroll_id else None, + "batchSize": self.config.batch_size, + }, + ) + except Exception as e: + self.report.failure( + f"While trying to get dataflows with {scroll_id}", exc=e + ) + break + scrollAcrossEntities = result.get("scrollAcrossEntities") if not scrollAcrossEntities: raise ValueError("Missing scrollAcrossEntities in response") + logger.info(f"Got {scrollAcrossEntities.get('count')} DataFlow entities") scroll_id = scrollAcrossEntities.get("nextScrollId") for flow in scrollAcrossEntities.get("searchResults"): @@ -398,6 +410,8 @@ def get_data_flows(self) -> Iterable[DataFlowEntity]: previous_scroll_id = scroll_id def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + if not self.config.enabled: + return [] assert self.ctx.graph dataFlows: Dict[str, DataFlowEntity] = {} @@ -411,14 +425,20 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: deleted_jobs: int = 0 while True: - result = self.ctx.graph.execute_graphql( - DATAJOB_QUERY, - { - "query": "*", - "scrollId": scroll_id if scroll_id else None, - "batchSize": self.config.batch_size, - }, - ) + try: + result = self.ctx.graph.execute_graphql( + DATAJOB_QUERY, + { + "query": "*", + "scrollId": scroll_id if scroll_id else None, + "batchSize": self.config.batch_size, + }, + ) + except Exception as e: + self.report.failure( + f"While trying to get data jobs with {scroll_id}", exc=e + ) + break scrollAcrossEntities = result.get("scrollAcrossEntities") if not scrollAcrossEntities: raise ValueError("Missing scrollAcrossEntities in response") diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py index bb4ab753543b7..93f004ab675ed 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py @@ -20,6 +20,9 @@ class SoftDeletedEntitiesCleanupConfig(ConfigModel): + enabled: bool = Field( + default=True, description="Whether to do soft deletion cleanup." + ) retention_days: Optional[int] = Field( 10, description="Number of days to retain metadata in DataHub", @@ -156,6 +159,8 @@ def delete_soft_deleted_entity(self, urn: str) -> None: self.delete_entity(urn) def cleanup_soft_deleted_entities(self) -> None: + if not self.config.enabled: + return assert self.ctx.graph start_time = time.time()