From 18b2a2295874c4d2643d5a9b0d89c5fbadc387ea Mon Sep 17 00:00:00 2001 From: Maxim Martynov Date: Mon, 27 May 2024 11:17:01 +0300 Subject: [PATCH] Fix OpenLineage ingestor (#16416) * Fix OpenLineage ingestor * py format --------- Co-authored-by: ulixius9 --- .../ingestion/source/pipeline/openlineage/metadata.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py index 2e1b63a36a57..9804cce2690e 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py @@ -77,7 +77,9 @@ class OpenlineageSource(PipelineServiceSource): """ @classmethod - def create(cls, config_dict, metadata: OpenMetadata): + def create( + cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None + ): """Create class instance""" config: WorkflowSource = WorkflowSource.parse_obj(config_dict) connection: OpenLineageConnection = config.serviceConnection.__root__.config @@ -379,7 +381,7 @@ def yield_pipeline( {json.dumps(pipeline_details.run_facet, indent=4).strip()}```""" request = CreatePipelineRequest( name=pipeline_name, - service=self.context.pipeline_service, + service=self.context.get().pipeline_service, description=description, ) @@ -433,8 +435,8 @@ def yield_pipeline_lineage_details( pipeline_fqn = fqn.build( metadata=self.metadata, entity_type=Pipeline, - service_name=self.context.pipeline_service, - pipeline_name=self.context.pipeline, + service_name=self.context.get().pipeline_service, + pipeline_name=self.context.get().pipeline, ) pipeline_entity = self.metadata.get_by_name(entity=Pipeline, fqn=pipeline_fqn)