From 6068971c5a6c0988a14867401b2727ecc0b83dfd Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Mon, 11 Sep 2023 10:32:24 +0100 Subject: [PATCH 1/7] Fix issue reported by the community in slack: https://apache-airflow.slack.com/archives/C059CC42E9W/p1694212778764869 --- cosmos/operators/local.py | 30 +++++++++++++++++------------- tests/operators/test_local.py | 14 ++++++++++++++ 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 69bc1a78c..155a13bc0 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -305,7 +305,7 @@ def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset] DAG.bulk_write_to_db([self.dag], session=session) session.commit() - def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> OperatorLineage: + def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> OperatorLineage | None: """ Collect the input, output, job and run facets for this operator. It relies on the calculate_openlineage_events_completes having being called before. @@ -315,18 +315,22 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope run_facets: dict[str, Any] = {} job_facets: dict[str, Any] = {} - for completed in task_instance.openlineage_events_completes: - [inputs.append(input_) for input_ in completed.inputs if input_ not in inputs] # type: ignore - [outputs.append(output) for output in completed.outputs if output not in outputs] # type: ignore - run_facets = {**run_facets, **completed.run.facets} - job_facets = {**job_facets, **completed.job.facets} - - return OperatorLineage( - inputs=inputs, - outputs=outputs, - run_facets=run_facets, - job_facets=job_facets, - ) + if is_openlineage_available and hasattr(task_instance, "openlineage_events_completes"): + for completed in task_instance.openlineage_events_completes: + [inputs.append(input_) for input_ in completed.inputs if input_ not in inputs] # type: ignore + [outputs.append(output) for output in completed.outputs if output not in outputs] # type: ignore + run_facets = {**run_facets, **completed.run.facets} + job_facets = {**job_facets, **completed.job.facets} + + return OperatorLineage( + inputs=inputs, + outputs=outputs, + run_facets=run_facets, + job_facets=job_facets, + ) + else: + logger.warning("Unable to emit OpenLineage events since dependencies are not installed.") + return None def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None: dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 7d00d88c8..ccf17d2a5 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -200,6 +200,20 @@ class MockEvent: assert facets.job_facets == {"d": 4} +def test_run_operator_emits_events_without_openlineage_events_completes(caplog): + dbt_base_operator = DbtLocalBaseOperator( + profile_config=profile_config, + task_id="my-task", + project_dir="my/dir", + should_store_compiled_sql=False, + ) + delattr(dbt_base_operator, "openlineage_events_completes") + facets = dbt_base_operator.get_openlineage_facets_on_complete(dbt_base_operator) + assert facets is None + log = "cosmos.operators.local:local.py:332 Unable to emit OpenLineage events since dependencies are not installed." + assert log in caplog.text + + def test_store_compiled_sql() -> None: dbt_base_operator = DbtLocalBaseOperator( profile_config=profile_config, From 2afc6438419c86ef7dd1de7cf21a8a68e7bfc26b Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Mon, 11 Sep 2023 16:30:31 +0100 Subject: [PATCH 2/7] Fix CI --- cosmos/operators/local.py | 31 ++++++++++++++++++++----------- tests/operators/test_local.py | 7 +++++-- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 155a13bc0..62d7538da 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -305,32 +305,41 @@ def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset] DAG.bulk_write_to_db([self.dag], session=session) session.commit() - def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> OperatorLineage | None: + def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> OperatorLineage: """ Collect the input, output, job and run facets for this operator. It relies on the calculate_openlineage_events_completes having being called before. + + This method is called by Openlineage even if `execute` fails, because `get_openlineage_facets_on_failure` + is not implemented. """ + inputs = [] outputs = [] run_facets: dict[str, Any] = {} job_facets: dict[str, Any] = {} - if is_openlineage_available and hasattr(task_instance, "openlineage_events_completes"): - for completed in task_instance.openlineage_events_completes: + openlineage_events_completes = None + if hasattr(self, "openlineage_events_completes"): + openlineage_events_completes = self.openlineage_events_completes + elif hasattr(task_instance, "openlineage_events_completes"): + openlineage_events_completes = task_instance.openlineage_events_completes + + if is_openlineage_available and openlineage_events_completes: + for completed in openlineage_events_completes: [inputs.append(input_) for input_ in completed.inputs if input_ not in inputs] # type: ignore [outputs.append(output) for output in completed.outputs if output not in outputs] # type: ignore run_facets = {**run_facets, **completed.run.facets} job_facets = {**job_facets, **completed.job.facets} - - return OperatorLineage( - inputs=inputs, - outputs=outputs, - run_facets=run_facets, - job_facets=job_facets, - ) else: logger.warning("Unable to emit OpenLineage events since dependencies are not installed.") - return None + + return OperatorLineage( + inputs=inputs, + outputs=outputs, + run_facets=run_facets, + job_facets=job_facets, + ) def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None: dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index ccf17d2a5..835ad19ad 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -209,8 +209,11 @@ def test_run_operator_emits_events_without_openlineage_events_completes(caplog): ) delattr(dbt_base_operator, "openlineage_events_completes") facets = dbt_base_operator.get_openlineage_facets_on_complete(dbt_base_operator) - assert facets is None - log = "cosmos.operators.local:local.py:332 Unable to emit OpenLineage events since dependencies are not installed." + assert facets.inputs == [] + assert facets.outputs == [] + assert facets.run_facets == {} + assert facets.job_facets == {} + log = "Unable to emit OpenLineage events since dependencies are not installed" assert log in caplog.text From dcbde93d9c8b84ccfe816fac27f9c0115725d470 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Mon, 11 Sep 2023 16:45:23 +0100 Subject: [PATCH 3/7] Fix implementation --- cosmos/operators/local.py | 28 ++++++++++++++++++---------- tests/operators/test_local.py | 7 ++----- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 62d7538da..a4965dc63 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -305,7 +305,7 @@ def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset] DAG.bulk_write_to_db([self.dag], session=session) session.commit() - def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> OperatorLineage: + def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> OperatorLineage | None: """ Collect the input, output, job and run facets for this operator. It relies on the calculate_openlineage_events_completes having being called before. @@ -324,22 +324,30 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope openlineage_events_completes = self.openlineage_events_completes elif hasattr(task_instance, "openlineage_events_completes"): openlineage_events_completes = task_instance.openlineage_events_completes + if not openlineage_events_completes: + logger.warning("Unable to emit OpenLineage events since no events were created.") + return None - if is_openlineage_available and openlineage_events_completes: + if is_openlineage_available: for completed in openlineage_events_completes: [inputs.append(input_) for input_ in completed.inputs if input_ not in inputs] # type: ignore [outputs.append(output) for output in completed.outputs if output not in outputs] # type: ignore run_facets = {**run_facets, **completed.run.facets} job_facets = {**job_facets, **completed.job.facets} else: - logger.warning("Unable to emit OpenLineage events since dependencies are not installed.") - - return OperatorLineage( - inputs=inputs, - outputs=outputs, - run_facets=run_facets, - job_facets=job_facets, - ) + logger.warning("Unable to emit OpenLineage events since the necessary dependencies are not installed.") + return None + + if inputs or outputs or run_facets or job_facets: + return OperatorLineage( + inputs=inputs, + outputs=outputs, + run_facets=run_facets, + job_facets=job_facets, + ) + else: + logger.warning("Unable to emit OpenLineage events since the OperatorLineage is not available.") + return None def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None: dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 835ad19ad..74baa2491 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -209,11 +209,8 @@ def test_run_operator_emits_events_without_openlineage_events_completes(caplog): ) delattr(dbt_base_operator, "openlineage_events_completes") facets = dbt_base_operator.get_openlineage_facets_on_complete(dbt_base_operator) - assert facets.inputs == [] - assert facets.outputs == [] - assert facets.run_facets == {} - assert facets.job_facets == {} - log = "Unable to emit OpenLineage events since dependencies are not installed" + assert facets is None + log = "Unable to emit OpenLineage events since no events were created." assert log in caplog.text From 34c171808c9e126f01cfd097808d93c869ce143e Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Mon, 11 Sep 2023 21:52:44 +0100 Subject: [PATCH 4/7] Fix openlineage event emition --- cosmos/operators/local.py | 38 ++++++++++++++++++----------------- tests/operators/test_local.py | 7 +++++-- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index a4965dc63..60aeb71e8 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -4,6 +4,7 @@ import shutil import signal import tempfile +from attr import define from pathlib import Path from typing import Any, Callable, Literal, Sequence, TYPE_CHECKING @@ -59,6 +60,13 @@ logger.exception(error) is_openlineage_available = False + @define + class OperatorLineage: # type: ignore + inputs: list[str] = list() + outputs: list[str] = list() + run_facets: dict[str, str] = dict() + job_facets: dict[str, str] = dict() + class DbtLocalBaseOperator(DbtBaseOperator): """ @@ -305,7 +313,7 @@ def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset] DAG.bulk_write_to_db([self.dag], session=session) session.commit() - def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> OperatorLineage | None: + def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> OperatorLineage: """ Collect the input, output, job and run facets for this operator. It relies on the calculate_openlineage_events_completes having being called before. @@ -324,30 +332,24 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope openlineage_events_completes = self.openlineage_events_completes elif hasattr(task_instance, "openlineage_events_completes"): openlineage_events_completes = task_instance.openlineage_events_completes - if not openlineage_events_completes: - logger.warning("Unable to emit OpenLineage events since no events were created.") - return None + else: + logger.warning("Unable to emit OpenLineage events due to lack of data.") - if is_openlineage_available: + if openlineage_events_completes is not None: for completed in openlineage_events_completes: [inputs.append(input_) for input_ in completed.inputs if input_ not in inputs] # type: ignore [outputs.append(output) for output in completed.outputs if output not in outputs] # type: ignore run_facets = {**run_facets, **completed.run.facets} job_facets = {**job_facets, **completed.job.facets} else: - logger.warning("Unable to emit OpenLineage events since the necessary dependencies are not installed.") - return None - - if inputs or outputs or run_facets or job_facets: - return OperatorLineage( - inputs=inputs, - outputs=outputs, - run_facets=run_facets, - job_facets=job_facets, - ) - else: - logger.warning("Unable to emit OpenLineage events since the OperatorLineage is not available.") - return None + logger.warning("Unable to emit OpenLineage events due to lack of dependencies or data.") + + return OperatorLineage( + inputs=inputs, + outputs=outputs, + run_facets=run_facets, + job_facets=job_facets, + ) def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None: dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 74baa2491..4a94d4d8a 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -209,8 +209,11 @@ def test_run_operator_emits_events_without_openlineage_events_completes(caplog): ) delattr(dbt_base_operator, "openlineage_events_completes") facets = dbt_base_operator.get_openlineage_facets_on_complete(dbt_base_operator) - assert facets is None - log = "Unable to emit OpenLineage events since no events were created." + assert facets.inputs == [] + assert facets.outputs == [] + assert facets.run_facets == {} + assert facets.job_facets == {} + log = "Unable to emit OpenLineage events due to lack of dependencies or data." assert log in caplog.text From 93defee6a27deb81d3a1da754a2816a9cbb58684 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 12 Sep 2023 10:35:40 +0100 Subject: [PATCH 5/7] Add attr as a dependency --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 66e8d2852..e4460f223 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ classifiers = [ ] dependencies = [ # Airflow & Pydantic issue: https://github.com/apache/airflow/issues/32311 + "attrs", "pydantic>=1.10.0,<2.0.0", "apache-airflow>=2.3.0", "importlib-metadata; python_version < '3.8'", @@ -133,7 +134,6 @@ dependencies = [ "apache-airflow-providers-cncf-kubernetes>=5.1.1,<7.3.0", "types-PyYAML", "types-attrs", - "attrs", "types-requests", "types-python-dateutil", "apache-airflow" From d0f4d08b4f04b5f792b2999c8ad343ff631e2c7c Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 13 Sep 2023 10:45:03 +0100 Subject: [PATCH 6/7] Make openlineage package optional --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index e4460f223..e7bce01be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,6 @@ dependencies = [ "Jinja2>=3.0.0", "typing-extensions; python_version < '3.8'", "virtualenv", - "openlineage-integration-common", ] [project.optional-dependencies] @@ -77,6 +76,7 @@ dbt-spark = [ "dbt-spark<=1.5.4", ] openlineage = [ + "openlineage-integration-common", "openlineage-airflow", ] all = [ From dd7df64ef43cdbbbb060d8d149e27b35b141f821 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 14 Sep 2023 11:19:59 +0100 Subject: [PATCH 7/7] Address CR feedback --- cosmos/operators/local.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 60aeb71e8..aae04ee2a 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -54,8 +54,7 @@ from openlineage.airflow.extractors.base import OperatorLineage except (ImportError, ModuleNotFoundError) as error: logger.warning( - "To enable emitting Openlineage events. In order to use openlineage, upgrade to Airflow 2.7 or " - "install astronomer-cosmos[openlineage]." + "To enable emitting Openlineage events, upgrade to Airflow 2.7 or install astronomer-cosmos[openlineage]." ) logger.exception(error) is_openlineage_available = False @@ -68,6 +67,12 @@ class OperatorLineage: # type: ignore job_facets: dict[str, str] = dict() +try: + LINEAGE_NAMESPACE = conf.get("openlineage", "namespace") +except airflow.exceptions.AirflowConfigException: + LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE) + + class DbtLocalBaseOperator(DbtBaseOperator): """ Executes a dbt core cli command locally. @@ -259,15 +264,9 @@ def calculate_openlineage_events_completes( for key, value in env.items(): os.environ[key] = str(value) - lineage_namespace = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE) - try: - lineage_namespace = conf.get("openlineage", "namespace") - except airflow.exceptions.AirflowConfigException: - pass - openlineage_processor = DbtLocalArtifactProcessor( producer=OPENLINEAGE_PRODUCER, - job_namespace=lineage_namespace, + job_namespace=LINEAGE_NAMESPACE, project_dir=project_dir, profile_name=self.profile_config.profile_name, target=self.profile_config.target_name, @@ -278,8 +277,8 @@ def calculate_openlineage_events_completes( try: events = openlineage_processor.parse() self.openlineage_events_completes = events.completes - except (FileNotFoundError, NotImplementedError) as error: - logger.exception(error) + except (FileNotFoundError, NotImplementedError): + logger.debug("Unable to parse OpenLineage events", stack_info=True) def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]: """ @@ -333,7 +332,7 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope elif hasattr(task_instance, "openlineage_events_completes"): openlineage_events_completes = task_instance.openlineage_events_completes else: - logger.warning("Unable to emit OpenLineage events due to lack of data.") + logger.info("Unable to emit OpenLineage events due to lack of data.") if openlineage_events_completes is not None: for completed in openlineage_events_completes: @@ -342,7 +341,7 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope run_facets = {**run_facets, **completed.run.facets} job_facets = {**job_facets, **completed.job.facets} else: - logger.warning("Unable to emit OpenLineage events due to lack of dependencies or data.") + logger.info("Unable to emit OpenLineage events due to lack of dependencies or data.") return OperatorLineage( inputs=inputs,