Skip to content

Commit

Permalink
fix(ingest/unity): creating group urn in case of group (datahub-proje…
Browse files Browse the repository at this point in the history
  • Loading branch information
dushayntAW authored Mar 7, 2024
1 parent 8047d0b commit e963047
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 0 deletions.
10 changes: 10 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@ def service_principals(self) -> Iterable[ServicePrincipal]:
if optional_sp:
yield optional_sp

def groups(self):
"""
fetch the list of the groups belongs to the workspace, using the workspace client
create the list of group's display name, iterating through the list of groups fetched by the workspace client
"""
group_list: List[Optional[str]] = []
for group in self._workspace_client.groups.list():
group_list.append(group.display_name)
return group_list

def workspace_notebooks(self) -> Iterable[Notebook]:
for obj in self._workspace_client.workspace.list("/", recursive=True):
if obj.object_type == ObjectType.NOTEBOOK and obj.object_id and obj.path:
Expand Down
11 changes: 11 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/unity/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
make_domain_urn,
make_group_urn,
make_schema_field_urn,
make_user_urn,
)
Expand Down Expand Up @@ -184,6 +185,7 @@ def __init__(self, ctx: PipelineContext, config: UnityCatalogSourceConfig):

# Global map of service principal application id -> ServicePrincipal
self.service_principals: Dict[str, ServicePrincipal] = {}
self.groups: List[str] = []
# Global set of table refs
self.table_refs: Set[TableReference] = set()
self.view_refs: Set[TableReference] = set()
Expand Down Expand Up @@ -256,6 +258,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if self.config.include_ownership:
self.report.report_ingestion_stage_start("Ingest service principals")
self.build_service_principal_map()
self.build_groups_map()
if self.config.include_notebooks:
self.report.report_ingestion_stage_start("Ingest notebooks")
yield from self.process_notebooks()
Expand Down Expand Up @@ -315,6 +318,12 @@ def build_service_principal_map(self) -> None:
"service-principals", f"Unable to fetch service principals: {e}"
)

def build_groups_map(self) -> None:
try:
self.groups += self.unity_catalog_api_proxy.groups()
except Exception as e:
self.report.report_warning("groups", f"Unable to fetch groups: {e}")

def process_notebooks(self) -> Iterable[MetadataWorkUnit]:
for notebook in self.unity_catalog_api_proxy.workspace_notebooks():
if not self.config.notebook_pattern.allowed(notebook.path):
Expand Down Expand Up @@ -600,6 +609,8 @@ def _get_domain_aspect(self, dataset_name: str) -> Optional[DomainsClass]:

def get_owner_urn(self, user: Optional[str]) -> Optional[str]:
if self.config.include_ownership and user is not None:
if user in self.groups:
return make_group_urn(user)
return self.gen_user_urn(user)
return None

Expand Down

0 comments on commit e963047

Please sign in to comment.