Skip to content

Commit

Permalink
fix(ingest/mssql): add container dataflow/ datajob entities (datahub-…
Browse files Browse the repository at this point in the history
  • Loading branch information
sgomezvillamor authored Dec 22, 2024
1 parent 8350a4e commit 494c522
Show file tree
Hide file tree
Showing 6 changed files with 795 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
make_data_platform_urn,
make_dataplatform_instance_urn,
)
from datahub.emitter.mcp_builder import DatabaseKey
from datahub.metadata.schema_classes import (
ContainerClass,
DataFlowInfoClass,
DataJobInfoClass,
DataJobInputOutputClass,
Expand Down Expand Up @@ -210,6 +212,18 @@ def as_datajob_info_aspect(self) -> DataJobInfoClass:
status=self.status,
)

@property
def as_container_aspect(self) -> ContainerClass:
databaseKey = DatabaseKey(
platform=self.entity.flow.orchestrator,
instance=self.entity.flow.platform_instance
if self.entity.flow.platform_instance
else None,
env=self.entity.flow.env,
database=self.entity.flow.db,
)
return ContainerClass(container=databaseKey.as_urn())

@property
def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClass]:
if self.entity.flow.platform_instance:
Expand Down Expand Up @@ -257,6 +271,18 @@ def as_dataflow_info_aspect(self) -> DataFlowInfoClass:
externalUrl=self.external_url,
)

@property
def as_container_aspect(self) -> ContainerClass:
databaseKey = DatabaseKey(
platform=self.entity.orchestrator,
instance=self.entity.platform_instance
if self.entity.platform_instance
else None,
env=self.entity.env,
database=self.entity.db,
)
return ContainerClass(container=databaseKey.as_urn())

@property
def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClass]:
if self.entity.platform_instance:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,11 @@ def construct_job_workunits(
aspect=data_job.as_datajob_info_aspect,
).as_workunit()

yield MetadataChangeProposalWrapper(
entityUrn=data_job.urn,
aspect=data_job.as_container_aspect,
).as_workunit()

data_platform_instance_aspect = data_job.as_maybe_platform_instance_aspect
if data_platform_instance_aspect:
yield MetadataChangeProposalWrapper(
Expand All @@ -662,6 +667,11 @@ def construct_flow_workunits(
aspect=data_flow.as_dataflow_info_aspect,
).as_workunit()

yield MetadataChangeProposalWrapper(
entityUrn=data_flow.urn,
aspect=data_flow.as_container_aspect,
).as_workunit()

data_platform_instance_aspect = data_flow.as_maybe_platform_instance_aspect
if data_platform_instance_aspect:
yield MetadataChangeProposalWrapper(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,43 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63",
"urn": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)",
Expand All @@ -113,11 +150,11 @@
"aspect": {
"json": {
"customProperties": {
"job_id": "c2d77890-83ba-435f-879b-1c77fa38dd47",
"job_id": "ab960f9d-30f3-4ced-b558-4f9b6671b6dd",
"job_name": "Weekly Demo Data Backup",
"description": "No description available.",
"date_created": "2024-12-05 16:44:43.910000",
"date_modified": "2024-12-05 16:44:44.043000",
"date_created": "2024-12-20 15:15:24.483000",
"date_modified": "2024-12-20 15:15:24.653000",
"step_id": "1",
"step_name": "Set database to read only",
"subsystem": "TSQL",
Expand All @@ -136,6 +173,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)",
Expand All @@ -154,6 +207,27 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63",
"urn": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:7da983a1581c33cce8a106587b150f02",
Expand Down Expand Up @@ -2103,8 +2177,8 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"view_definition": "CREATE VIEW Foo.PersonsView AS SELECT * FROM Foo.Persons;\n",
"is_view": "True"
"is_view": "True",
"view_definition": "CREATE VIEW Foo.PersonsView AS SELECT * FROM Foo.Persons;\n"
},
"name": "PersonsView",
"tags": []
Expand Down Expand Up @@ -2269,6 +2343,43 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63",
"urn": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
Expand All @@ -2282,8 +2393,8 @@
"code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n",
"input parameters": "['@ID']",
"parameter @ID": "{'type': 'int'}",
"date_created": "2024-12-05 16:44:43.800000",
"date_modified": "2024-12-05 16:44:43.800000"
"date_created": "2024-12-20 15:15:24.290000",
"date_modified": "2024-12-20 15:15:24.290000"
},
"externalUrl": "",
"name": "DemoData.Foo.Proc.With.SpecialChar",
Expand All @@ -2298,6 +2409,43 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63",
"urn": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)",
Expand All @@ -2310,8 +2458,8 @@
"depending_on_procedure": "{}",
"code": "CREATE PROCEDURE [Foo].[NewProc]\n AS\n BEGIN\n --insert into items table from salesreason table\n insert into Foo.Items (ID, ItemName)\n SELECT TempID, Name\n FROM Foo.SalesReason;\n\n\n IF OBJECT_ID('Foo.age_dist', 'U') IS NULL\n BEGIN\n -- Create and populate if table doesn't exist\n SELECT Age, COUNT(*) as Count\n INTO Foo.age_dist\n FROM Foo.Persons\n GROUP BY Age\n END\n ELSE\n BEGIN\n -- Update existing table\n TRUNCATE TABLE Foo.age_dist;\n\n INSERT INTO Foo.age_dist (Age, Count)\n SELECT Age, COUNT(*) as Count\n FROM Foo.Persons\n GROUP BY Age\n END\n\n SELECT ID, Age INTO #TEMPTABLE FROM NewData.FooNew.PersonsNew\n \n UPDATE DemoData.Foo.Persons\n SET Age = t.Age\n FROM DemoData.Foo.Persons p\n JOIN #TEMPTABLE t ON p.ID = t.ID\n\n END\n",
"input parameters": "[]",
"date_created": "2024-12-05 16:44:43.803000",
"date_modified": "2024-12-05 16:44:43.803000"
"date_created": "2024-12-20 15:15:24.300000",
"date_modified": "2024-12-20 15:15:24.300000"
},
"externalUrl": "",
"name": "DemoData.Foo.NewProc",
Expand All @@ -2326,6 +2474,43 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63",
"urn": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:250ce23f940485303fa5e5d4f5194975",
Expand Down Expand Up @@ -4427,8 +4612,8 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"view_definition": "CREATE VIEW FooNew.View1 AS\nSELECT LastName, FirstName\nFROM FooNew.PersonsNew\nWHERE Age > 18\n",
"is_view": "True"
"is_view": "True",
"view_definition": "CREATE VIEW FooNew.View1 AS\nSELECT LastName, FirstName\nFROM FooNew.PersonsNew\nWHERE Age > 18\n"
},
"name": "View1",
"tags": []
Expand Down
Loading

0 comments on commit 494c522

Please sign in to comment.