Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: DAG import error for TaskGroup #35

Merged
merged 2 commits into from
Aug 15, 2023

Conversation

FredrikBakken
Copy link
Contributor

@FredrikBakken FredrikBakken commented Sep 13, 2022

For further explanation about this bugfix, please see #34.

Closes #34, Closes #44, Closes #69

Signed-off-by: Fredrik Bakken <[email protected]>
@FredrikBakken FredrikBakken changed the title Fix DAG import error message Fix DAG import error Sep 13, 2022
@talSofer
Copy link
Contributor

Hi @FredrikBakken, thanks for your contribution! Happy to have you on-board!
We will comment on the pr next week :)

@arielshaqed
Copy link
Contributor

Hi @FredrikBakken !

Thanks for this PR, it's much appreciated. I am by no means expert in Airflow, but this confuses me (and is probably why we have this assumption in the code):

A task must include or inherit the arguments task_id and owner, otherwise Airflow will raise an exception.

Can you share some context from your calling code, please? Also, would it be acceptable to you to raise an error if task_id is not set when creating these tasks?

@arielshaqed arielshaqed self-requested a review September 19, 2022 09:56
Copy link
Contributor

@arielshaqed arielshaqed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi again @FredrikBakken,

Following #36 and its fix in #37, it turns out that currently our CI fails on this PR. Could you please rebase on top of it so we can test? (I cannot do it in practice because it's on a fork -- so if you cannot I shall merely do so manually...).

@arielshaqed
Copy link
Contributor

@FredrikBakken: once rebased (#38), this passes. 🎇

Gentle reminder: The blocker to merging is this comment regarding its necessity in Airflow DAGs. I understand from the docs close to the exact opposite -- that every task must have a task_id. If you could point me at using code or an example from other tasks in other providers, that would be very helpful!

Thanks again :-)

@arielshaqed
Copy link
Contributor

Closing due to lack of traction. @FredrikBakken thanks for your work on this! Of course e will happily reopen when you have more time for this.

@FredrikBakken
Copy link
Contributor Author

FredrikBakken commented Jun 5, 2023

@arielshaqed, sorry for the late response to this PR. It has been a couple of busy months and I have not had time to keep up to date on everything. I am now back to using LakeFS (with Airflow) on some local tests and have re-encountered this issue again on airflow-provider-lakefs==0.46.1. This is how the error message looks like in Airflow:

airflow_error

My Airflow task looks as follows:

task_fetch_dataset = LakeFSUploadOperator(
    task_id='upload_file',
    repo=default_args.get('repo_bronze'),
    branch=f'ssb_dataset_{date}',
    path='dataset',
    content=NamedStringIO(
        content="",
        name='dataset.json'))

I agree that task_id should exist for all task operators, but I suspect that these operators inherit this parameter from the BaseOperator. This is e.g. defined here for LakeFSUploadOperator: https://github.com/treeverse/airflow-provider-lakeFS/blob/main/lakefs_provider/operators/upload_operator.py#LL9C42-L9C42

I also took a look at the official BashOperator from Airflow, which does not define task_id directly: https://github.com/apache/airflow/blob/main/airflow/operators/bash.py#L138-L148. Instead, it seems like this operator inherits this value from the BaseOperator: https://github.com/apache/airflow/blob/main/airflow/models/baseoperator.py#L212

Let me know if there are anything you want me to change on the PR 🤝

@arielshaqed
Copy link
Contributor

Sorry for not addressing this sooner -- I failed to see the comment, because I don't regularly inspect closed PRs. Which is particularly unpleasant given that I closed the PR.

I went over your explanations and now understand better. I will try to rerun CI on this or on a copy, and then see if we can get this fix in.

@arielshaqed arielshaqed reopened this Aug 3, 2023
@FredrikBakken
Copy link
Contributor Author

No worries, @arielshaqed ! Great to hear, and please let me know if there are any things I can do on my side.

@FredrikBakken FredrikBakken changed the title Fix DAG import error bugfix: DAG import error for TaskGroup Aug 8, 2023
@FredrikBakken
Copy link
Contributor Author

@arielshaqed, I've updated the title of this PR to better define the scope of what it solves. Using the operators defined in the #34 table works well in single task mode, but once the tasks are placed within a TaskGroup the error will be displayed. This PR solves this by removing self.task_id = kwargs.get("task_id") from the operators that throws error messages.

@FredrikBakken FredrikBakken force-pushed the fix-dag-error branch 2 times, most recently from 97ea5c3 to da2e8e6 Compare August 13, 2023 18:04
@FredrikBakken
Copy link
Contributor Author

@arielshaqed, should I add some provider tests to this PR to confirm that the following operators, LakeFSCommitOperator, LakeFSGetCommitOperator, LakeFSMergeOperator, and LakeFSUploadOperator - now work as expected inside of TaskGroups?

@arielshaqed
Copy link
Contributor

@arielshaqed, should I add some provider tests to this PR to confirm that the following operators, LakeFSCommitOperator, LakeFSGetCommitOperator, LakeFSMergeOperator, and LakeFSUploadOperator - now work as expected inside of TaskGroups?

Sorry. Opened #77 for this, I've blocked this PR for too long already.

Copy link
Contributor

@arielshaqed arielshaqed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THANKS!

@arielshaqed arielshaqed merged commit fd52426 into treeverse:main Aug 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Azure: Managed Airflow DAG Import Errors LakeFS operators not working in task group DAG Import Errors
3 participants