Skip to content

Commit

Permalink
debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana committed Dec 20, 2024
1 parent bc98126 commit 41a9293
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
20 changes: 10 additions & 10 deletions cosmos/listeners/dag_run_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ def uses_cosmos(dag: DAG) -> bool:

@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str) -> None:
logger.info("The on_dag_run_success was called")

logger.info(f"dir: {dir(dag_run.dag)}")
logger.info("The on_dag_run_success was called")
# The following is an airflow.serialization.serialized_objects.SerializedDAG instance
serialized_dag = dag_run.get_dag()
logger.info(f" are they equal? {serialized_dag == dag_run.dag}")

logger.info(f"dir: {dir(serialized_dag)}")
logger.info(f"__dict__: {serialized_dag.__dict__}")
logger.info(f"1: {serialized_dag.fileloc}")
logger.info(f"2:{serialized_dag.filepath}")
logger.info(f"3: {serialized_dag.task_dict}")
Expand All @@ -64,21 +66,19 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None:

dag_bag = DagBag(dag_folder=serialized_dag.fileloc, include_examples=False)
dag = dag_bag.get_dag(dag_run.dag_id)
logger.info(f"dag: {dag}")

if not uses_cosmos(dag):
if not uses_cosmos(serialized_dag):
logger.info("The DAG does not use Cosmos")
logger.info(f"5: {serialized_dag.deserialize()}")
return

logger.info(f"5: {serialized_dag.deserialize()}")

additional_telemetry_metrics = {
"dag_hash": dag_run.dag_hash,
"status": EventStatus.SUCCESS,
"task_count": len(dag.task_ids),
"cosmos_task_count": total_cosmos_tasks(dag),
"cosmos_task_groups_count": total_cosmos_task_groups(dag),
"is_cosmos_dag": is_cosmos_dag(dag),
"task_count": len(serialized_dag.task_ids),
"cosmos_task_count": total_cosmos_tasks(serialized_dag),
"cosmos_task_groups_count": total_cosmos_task_groups(serialized_dag),
"is_cosmos_dag": is_cosmos_dag(serialized_dag),
}

telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics)
Expand Down
5 changes: 3 additions & 2 deletions tests/perf/test_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ def generate_project(

yield project_path
finally:
pass
# clean up the models in the project_path / models directory
for model in models_dir.iterdir():
model.unlink()
# for model in models_dir.iterdir():
# model.unlink()


@pytest.mark.perf
Expand Down

0 comments on commit 41a9293

Please sign in to comment.