From e963047dd08810a31e7e5117639ad4f5b4e0d247 Mon Sep 17 00:00:00 2001 From: dushayntAW <158567391+dushayntAW@users.noreply.github.com> Date: Thu, 7 Mar 2024 14:44:06 +0530 Subject: [PATCH] fix(ingest/unity): creating group urn in case of group (#9951) --- .../src/datahub/ingestion/source/unity/proxy.py | 10 ++++++++++ .../src/datahub/ingestion/source/unity/source.py | 11 +++++++++++ 2 files changed, 21 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py index b414f3f188c23..20aa10305fa8f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py @@ -187,6 +187,16 @@ def service_principals(self) -> Iterable[ServicePrincipal]: if optional_sp: yield optional_sp + def groups(self): + """ + fetch the list of the groups belongs to the workspace, using the workspace client + create the list of group's display name, iterating through the list of groups fetched by the workspace client + """ + group_list: List[Optional[str]] = [] + for group in self._workspace_client.groups.list(): + group_list.append(group.display_name) + return group_list + def workspace_notebooks(self) -> Iterable[Notebook]: for obj in self._workspace_client.workspace.list("/", recursive=True): if obj.object_type == ObjectType.NOTEBOOK and obj.object_id and obj.path: diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index ac77cd7e5e2be..ad5a75c4d73d4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -9,6 +9,7 @@ make_dataplatform_instance_urn, make_dataset_urn_with_platform_instance, make_domain_urn, + make_group_urn, make_schema_field_urn, make_user_urn, ) @@ -184,6 +185,7 @@ def __init__(self, ctx: PipelineContext, config: UnityCatalogSourceConfig): # Global map of service principal application id -> ServicePrincipal self.service_principals: Dict[str, ServicePrincipal] = {} + self.groups: List[str] = [] # Global set of table refs self.table_refs: Set[TableReference] = set() self.view_refs: Set[TableReference] = set() @@ -256,6 +258,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: if self.config.include_ownership: self.report.report_ingestion_stage_start("Ingest service principals") self.build_service_principal_map() + self.build_groups_map() if self.config.include_notebooks: self.report.report_ingestion_stage_start("Ingest notebooks") yield from self.process_notebooks() @@ -315,6 +318,12 @@ def build_service_principal_map(self) -> None: "service-principals", f"Unable to fetch service principals: {e}" ) + def build_groups_map(self) -> None: + try: + self.groups += self.unity_catalog_api_proxy.groups() + except Exception as e: + self.report.report_warning("groups", f"Unable to fetch groups: {e}") + def process_notebooks(self) -> Iterable[MetadataWorkUnit]: for notebook in self.unity_catalog_api_proxy.workspace_notebooks(): if not self.config.notebook_pattern.allowed(notebook.path): @@ -600,6 +609,8 @@ def _get_domain_aspect(self, dataset_name: str) -> Optional[DomainsClass]: def get_owner_urn(self, user: Optional[str]) -> Optional[str]: if self.config.include_ownership and user is not None: + if user in self.groups: + return make_group_urn(user) return self.gen_user_urn(user) return None