Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure: Managed Airflow DAG Import Errors #69

Closed
FredrikBakken opened this issue Aug 2, 2023 · 4 comments · Fixed by #35
Closed

Azure: Managed Airflow DAG Import Errors #69

FredrikBakken opened this issue Aug 2, 2023 · 4 comments · Fixed by #35
Assignees
Labels
bug Something isn't working contributor

Comments

@FredrikBakken
Copy link
Contributor

Versions

Airflow: 2.4.3
LakeFS: 0.104.0
airflow-provider-lakefs: 0.46.2

Error Message

Broken DAG: [/opt/airflow/dags/news_data.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskmixin.py", line 252, in <listcomp>
    return [self.dag.get_task(tid) for tid in self.upstream_task_ids]
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 2256, in get_task
    raise TaskNotFound(f"Task {task_id} not found")
airflow.exceptions.TaskNotFound: Task commit_changes not found

Steps to Reproduce

  1. Create Storage Account and Container with a dags directory.
  2. Upload DAG with LakeFSCommitOperator
  3. Create Azure Data Factory with Managed Airflow (preview)
  4. Ingest DAGs from Container into Airflow instance
  5. Open Airflow UI

Same error as found in this: #34

Suspect that changes in linked PR should solve it: #35

@itaiad200 itaiad200 added bug Something isn't working contributor labels Aug 3, 2023
@itaiad200 itaiad200 self-assigned this Aug 3, 2023
@FredrikBakken
Copy link
Contributor Author

Follw-up to this issue. I now tried to use the lakefs-dag.py example, which seems to work as expected - No DAG Import Error is thrown.

I then tried to comment out every existing LakeFSCommitOperator in my DAG, and replace it with the following:

task_commit = LakeFSCommitOperator(
    task_id='commit',
    repo=default_args.get('repo-iron'),
    msg="123",
    metadata={"committed_from": "airflow-operator"}
)

But, this still gives me the following error message:

Broken DAG: [/opt/airflow/dags/news_data.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskmixin.py", line 245, in <listcomp>
    return [self.dag.get_task(tid) for tid in self.downstream_task_ids]
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 2256, in get_task
    raise TaskNotFound(f"Task {task_id} not found")
airflow.exceptions.TaskNotFound: Task commit not found

@FredrikBakken
Copy link
Contributor Author

Tried to slim down an example DAG where I still see the error message on my end:

import pendulum

from airflow.decorators import dag
from airflow.utils.task_group import TaskGroup

from lakefs_provider.operators.commit_operator import LakeFSCommitOperator
from lakefs_provider.operators.create_branch_operator import LakeFSCreateBranchOperator
from lakefs_provider.operators.merge_operator import LakeFSMergeOperator
from lakefs_provider.sensors.file_sensor import LakeFSFileSensor


default_args = {
    "owner": "example",
    "repo-coal": "coal",
    "repo-iron": "iron",
    "repo-gold": "gold",
    "default-branch": "main",
    "branch": "example_data",
    "default-path": "example_data",
    "lakefs_conn_id": "lakefs",
}


@dag(
    default_args=default_args,
    description="example_data",
    start_date=pendulum.datetime(2023, 7, 1, tz="Europe/Oslo"),
    max_active_runs=1,
    catchup=False,
    tags=["example_data"])
def example_pipeline():
    date = "{{ ds }}"

    with TaskGroup(group_id="coal") as coal:
        task_sense_new_files = LakeFSFileSensor(
            task_id="sense_new_files",
            lakefs_conn_id=default_args.get("lakefs_conn_id"),
            repo=default_args.get("repo-coal"),
            branch=default_args.get("default-branch"),
            path=f"{default_args.get('default-path')}/{date}.txt",
        )

        task_sense_new_files

    with TaskGroup(group_id="iron") as iron:
        task_create_branch = LakeFSCreateBranchOperator(
            task_id="create_branch",
            repo=default_args.get("repo-iron"),
            branch=f"{default_args.get('branch')}_{date}",
            source_branch="main")

        task_commit_changes = LakeFSCommitOperator(
            task_id='commit_changes',
            repo=default_args.get('repo-iron'),
            branch=f'{default_args.get("branch")}_{date}',
            msg=f'Added example_data data.',
            metadata={
                "committed_from": "airflow-operator",
                "date": f"{date}",
                "data_source": "example_data"})

        task_merge_branch = LakeFSMergeOperator(
            task_id='merge_branch',
            do_xcom_push=True,
            repo=default_args.get('repo-iron'),
            source_ref=f'{default_args.get("branch")}_{date}',
            destination_branch='main',
            msg=f'Merge latest example_data data.',
            metadata={
                "committed_from": "airflow-operator",
                "date": f"{date}",
                "data_source": "example_data"})

        (
            task_create_branch >>
            task_commit_changes >>
            task_merge_branch
        )

    with TaskGroup(group_id="gold") as gold:
        task_create_branch = LakeFSCreateBranchOperator(
            task_id="create_branch",
            repo=default_args.get("repo-gold"),
            branch=f"{default_args.get('branch')}_{date}",
            source_branch="main")

        task_commit_changes = LakeFSCommitOperator(
            task_id='commit_changes',
            repo=default_args.get('repo-gold'),
            branch=f'{default_args.get("branch")}_{date}',
            msg=f'Added example_data data.',
            metadata={
                "committed_from": "airflow-operator",
                "date": f"{date}",
                "data_source": "example_data"})

        task_merge_branch = LakeFSMergeOperator(
            task_id='merge_branch',
            do_xcom_push=True,
            repo=default_args.get('repo-gold'),
            source_ref=f'{default_args.get("branch")}_{date}',
            destination_branch='main',
            msg=f'Merge latest example_data data.',
            metadata={
                "committed_from": "airflow-operator",
                "date": f"{date}",
                "data_source": "example_data"})

        (
            task_create_branch >>
            task_commit_changes >>
            task_merge_branch
        )

    coal >> iron >> gold


example_pipeline()

@FredrikBakken
Copy link
Contributor Author

I am using TaskGroups in my DAG, which I also see might be related to this issue: #44

@itaiad200 itaiad200 assigned arielshaqed and unassigned itaiad200 Aug 3, 2023
@FredrikBakken
Copy link
Contributor Author

FredrikBakken commented Aug 4, 2023

Did some further testings this morning by changing up the lakefs-dag.py demonstration DAG. In this case, I've updated the application to introduce TaskGroup to see if the error now is displayed in the UI.

Updated DAG can be found below:

from typing import Dict
from typing import Sequence

from collections import namedtuple
from itertools import zip_longest
import time

from io import StringIO

from airflow.decorators import dag
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup
from airflow.exceptions import AirflowFailException

from lakefs_provider.hooks.lakefs_hook import LakeFSHook
from lakefs_provider.operators.create_branch_operator import LakeFSCreateBranchOperator
from lakefs_provider.operators.create_symlink_operator import LakeFSCreateSymlinkOperator
from lakefs_provider.operators.merge_operator import LakeFSMergeOperator
from lakefs_provider.operators.upload_operator import LakeFSUploadOperator
from lakefs_provider.operators.commit_operator import LakeFSCommitOperator
from lakefs_provider.operators.get_commit_operator import LakeFSGetCommitOperator
from lakefs_provider.operators.get_object_operator import LakeFSGetObjectOperator
from lakefs_provider.sensors.file_sensor import LakeFSFileSensor
from lakefs_provider.sensors.commit_sensor import LakeFSCommitSensor
from airflow.operators.python import PythonOperator

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    "owner": "lakeFS",
    "branch": "example-branch",
    "repo": "example-repo",
    "path": "path/to/_SUCCESS",
    "default-branch": "main",
    "lakefs_conn_id": "conn_lakefs"
}

CONTENT_PREFIX = 'It is not enough to succeed.  Others must fail.'
COMMIT_MESSAGE_1 = 'committing to lakeFS using airflow!'
MERGE_MESSAGE_1 = 'merging to the default branch'

IdAndMessage = namedtuple('IdAndMessage', ['id', 'message'])


def check_expected_prefix(task_instance, actual: str, expected: str) -> None:
    if not actual.startswith(expected):
        raise AirflowFailException(f'Got:\n"{actual}"\nwhich does not start with\n{expected}')


def check_logs(task_instance, repo: str, ref: str, commits: Sequence[str], messages: Sequence[str],
               amount: int = 100) -> None:
    hook = LakeFSHook(default_args['lakefs_conn_id'])
    expected = [IdAndMessage(commit, message) for commit, message in zip(commits, messages)]
    actuals = (IdAndMessage(message=commit['message'], id=commit['id'])
               for commit in hook.log_commits(repo, ref, amount))
    for (expected, actual) in zip_longest(expected, actuals):
        if expected is None:
            # Matched all msgs!
            return
        if expected != actual:
            raise AirflowFailException(f'Got {actual} instead of {expected}')


class NamedStringIO(StringIO):
    def __init__(self, content: str, name: str) -> None:
        super().__init__(content)
        self.name = name


@dag(default_args=default_args,
     render_template_as_native_obj=True,
     max_active_runs=1,
     start_date=days_ago(2),
     schedule_interval=None,
     tags=['testing'])
def lakeFS_workflow():
    expected_commits = ['''{{ ti.xcom_pull('merge_branches') }}''',
                        '''{{ ti.xcom_pull('commit') }}''']
    expected_messages = [MERGE_MESSAGE_1, COMMIT_MESSAGE_1]

    with TaskGroup(group_id='p1') as p1:
        task_create_branch = LakeFSCreateBranchOperator(
            task_id='create_branch',
            source_branch=default_args.get('default-branch')
        )

        task_get_branch_commit = LakeFSGetCommitOperator(
            do_xcom_push=True,
            task_id='get_branch_commit',
            ref=default_args['branch'])

        task_create_branch >> task_get_branch_commit

    with TaskGroup(group_id='p2') as p2:
        task_sense_commit = LakeFSCommitSensor(
            task_id='sense_commit',
            prev_commit_id='''{{ task_instance.xcom_pull(task_ids='get_branch_commit', key='return_value').id }}''',
            mode='reschedule',
            poke_interval=1,
            timeout=10)

        task_merge = LakeFSMergeOperator(
            task_id='merge_branches',
            do_xcom_push=True,
            source_ref=default_args.get('branch'),
            destination_branch=default_args.get('default-branch'),
            msg=MERGE_MESSAGE_1,
            metadata={"committer": "airflow-operator"})

        task_check_logs_bulk = PythonOperator(
            task_id='check_logs_bulk',
            python_callable=check_logs,
            op_kwargs={
                'repo': default_args.get('repo'),
                'ref': '''{{ task_instance.xcom_pull(task_ids='merge_branches', key='return_value') }}''',
                'commits': expected_commits,
                'messages': expected_messages})

        task_check_logs_individually = PythonOperator(
            task_id='check_logs_individually',
            python_callable=check_logs,
            op_kwargs={
                'repo': default_args.get('repo'),
                'ref': '''{{ task_instance.xcom_pull(task_ids='merge_branches', key='return_value') }}''',
                'amount': 1,
                'commits': expected_commits,
                'messages': expected_messages})

        task_sense_commit >> task_merge >> [task_check_logs_bulk, task_check_logs_individually]

    with TaskGroup(group_id='p3') as p3:
        task_sense_file = LakeFSFileSensor(
            task_id='sense_file',
            mode='reschedule',
            poke_interval=1,
            timeout=10)

        task_get_file = LakeFSGetObjectOperator(
            task_id='get_object',
            do_xcom_push=True,
            ref=default_args['branch'])

        task_check_contents = PythonOperator(
            task_id='check_expected_prefix',
            python_callable=check_expected_prefix,
            op_kwargs={
                'actual': '''{{ task_instance.xcom_pull(task_ids='get_object', key='return_value') }}''',
                'expected': CONTENT_PREFIX})

        task_sense_file >> task_get_file >> task_check_contents

    with TaskGroup(group_id='p4') as p4:
        task_create_file = LakeFSUploadOperator(
            task_id='upload_file',
            content=NamedStringIO(content=f"{CONTENT_PREFIX} @{time.asctime()}", name='content'))

        task_commit = LakeFSCommitOperator(
            task_id='commit',
            msg=COMMIT_MESSAGE_1,
            metadata={"committed_from": "airflow-operator"})

        task_create_symlink = LakeFSCreateSymlinkOperator(task_id="create_symlink")

        task_create_file >> task_commit >> task_create_symlink

    p1 >> [p2, p3, p4]


lakeFS_workflow()

This is the expected UI render after introducting TaskGroups:
image

But, instead I am seeing the same error message as reported.

However, it is working as expected in a local Airflow instance where task_id is removed from operators as done here: #35

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working contributor
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants