Skip to content

Commit

Permalink
Fix openlineage event emition
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana committed Sep 12, 2023
1 parent ca3d9fa commit 101c2e5
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 20 deletions.
38 changes: 20 additions & 18 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import shutil
import signal
import tempfile
from attr import define
from pathlib import Path
from typing import Any, Callable, Literal, Sequence, TYPE_CHECKING

Expand Down Expand Up @@ -59,6 +60,13 @@
logger.exception(error)
is_openlineage_available = False

@define
class OperatorLineage: # type: ignore
inputs: list[str] = list()
outputs: list[str] = list()
run_facets: dict[str, str] = dict()
job_facets: dict[str, str] = dict()


class DbtLocalBaseOperator(DbtBaseOperator):
"""
Expand Down Expand Up @@ -305,7 +313,7 @@ def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset]
DAG.bulk_write_to_db([self.dag], session=session)
session.commit()

def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> OperatorLineage | None:
def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> OperatorLineage:
"""
Collect the input, output, job and run facets for this operator.
It relies on the calculate_openlineage_events_completes having being called before.
Expand All @@ -324,30 +332,24 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope
openlineage_events_completes = self.openlineage_events_completes
elif hasattr(task_instance, "openlineage_events_completes"):
openlineage_events_completes = task_instance.openlineage_events_completes
if not openlineage_events_completes:
logger.warning("Unable to emit OpenLineage events since no events were created.")
return None
else:
logger.warning("Unable to emit OpenLineage events due to lack of data.")

if is_openlineage_available:
if openlineage_events_completes is not None:
for completed in openlineage_events_completes:
[inputs.append(input_) for input_ in completed.inputs if input_ not in inputs] # type: ignore
[outputs.append(output) for output in completed.outputs if output not in outputs] # type: ignore
run_facets = {**run_facets, **completed.run.facets}
job_facets = {**job_facets, **completed.job.facets}
else:
logger.warning("Unable to emit OpenLineage events since the necessary dependencies are not installed.")
return None

if inputs or outputs or run_facets or job_facets:
return OperatorLineage(
inputs=inputs,
outputs=outputs,
run_facets=run_facets,
job_facets=job_facets,
)
else:
logger.warning("Unable to emit OpenLineage events since the OperatorLineage is not available.")
return None
logger.warning("Unable to emit OpenLineage events due to lack of dependencies or data.")

return OperatorLineage(
inputs=inputs,
outputs=outputs,
run_facets=run_facets,
job_facets=job_facets,
)

def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None:
dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags)
Expand Down
7 changes: 5 additions & 2 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,11 @@ def test_run_operator_emits_events_without_openlineage_events_completes(caplog):
)
delattr(dbt_base_operator, "openlineage_events_completes")
facets = dbt_base_operator.get_openlineage_facets_on_complete(dbt_base_operator)
assert facets is None
log = "Unable to emit OpenLineage events since no events were created."
assert facets.inputs == []
assert facets.outputs == []
assert facets.run_facets == {}
assert facets.job_facets == {}
log = "Unable to emit OpenLineage events due to lack of dependencies or data."
assert log in caplog.text


Expand Down

0 comments on commit 101c2e5

Please sign in to comment.