-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Azure: Managed Airflow DAG Import Errors #69
Comments
Follw-up to this issue. I now tried to use the I then tried to comment out every existing task_commit = LakeFSCommitOperator(
task_id='commit',
repo=default_args.get('repo-iron'),
msg="123",
metadata={"committed_from": "airflow-operator"}
) But, this still gives me the following error message:
|
Tried to slim down an example DAG where I still see the error message on my end: import pendulum
from airflow.decorators import dag
from airflow.utils.task_group import TaskGroup
from lakefs_provider.operators.commit_operator import LakeFSCommitOperator
from lakefs_provider.operators.create_branch_operator import LakeFSCreateBranchOperator
from lakefs_provider.operators.merge_operator import LakeFSMergeOperator
from lakefs_provider.sensors.file_sensor import LakeFSFileSensor
default_args = {
"owner": "example",
"repo-coal": "coal",
"repo-iron": "iron",
"repo-gold": "gold",
"default-branch": "main",
"branch": "example_data",
"default-path": "example_data",
"lakefs_conn_id": "lakefs",
}
@dag(
default_args=default_args,
description="example_data",
start_date=pendulum.datetime(2023, 7, 1, tz="Europe/Oslo"),
max_active_runs=1,
catchup=False,
tags=["example_data"])
def example_pipeline():
date = "{{ ds }}"
with TaskGroup(group_id="coal") as coal:
task_sense_new_files = LakeFSFileSensor(
task_id="sense_new_files",
lakefs_conn_id=default_args.get("lakefs_conn_id"),
repo=default_args.get("repo-coal"),
branch=default_args.get("default-branch"),
path=f"{default_args.get('default-path')}/{date}.txt",
)
task_sense_new_files
with TaskGroup(group_id="iron") as iron:
task_create_branch = LakeFSCreateBranchOperator(
task_id="create_branch",
repo=default_args.get("repo-iron"),
branch=f"{default_args.get('branch')}_{date}",
source_branch="main")
task_commit_changes = LakeFSCommitOperator(
task_id='commit_changes',
repo=default_args.get('repo-iron'),
branch=f'{default_args.get("branch")}_{date}',
msg=f'Added example_data data.',
metadata={
"committed_from": "airflow-operator",
"date": f"{date}",
"data_source": "example_data"})
task_merge_branch = LakeFSMergeOperator(
task_id='merge_branch',
do_xcom_push=True,
repo=default_args.get('repo-iron'),
source_ref=f'{default_args.get("branch")}_{date}',
destination_branch='main',
msg=f'Merge latest example_data data.',
metadata={
"committed_from": "airflow-operator",
"date": f"{date}",
"data_source": "example_data"})
(
task_create_branch >>
task_commit_changes >>
task_merge_branch
)
with TaskGroup(group_id="gold") as gold:
task_create_branch = LakeFSCreateBranchOperator(
task_id="create_branch",
repo=default_args.get("repo-gold"),
branch=f"{default_args.get('branch')}_{date}",
source_branch="main")
task_commit_changes = LakeFSCommitOperator(
task_id='commit_changes',
repo=default_args.get('repo-gold'),
branch=f'{default_args.get("branch")}_{date}',
msg=f'Added example_data data.',
metadata={
"committed_from": "airflow-operator",
"date": f"{date}",
"data_source": "example_data"})
task_merge_branch = LakeFSMergeOperator(
task_id='merge_branch',
do_xcom_push=True,
repo=default_args.get('repo-gold'),
source_ref=f'{default_args.get("branch")}_{date}',
destination_branch='main',
msg=f'Merge latest example_data data.',
metadata={
"committed_from": "airflow-operator",
"date": f"{date}",
"data_source": "example_data"})
(
task_create_branch >>
task_commit_changes >>
task_merge_branch
)
coal >> iron >> gold
example_pipeline() |
I am using |
Did some further testings this morning by changing up the Updated DAG can be found below: from typing import Dict
from typing import Sequence
from collections import namedtuple
from itertools import zip_longest
import time
from io import StringIO
from airflow.decorators import dag
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup
from airflow.exceptions import AirflowFailException
from lakefs_provider.hooks.lakefs_hook import LakeFSHook
from lakefs_provider.operators.create_branch_operator import LakeFSCreateBranchOperator
from lakefs_provider.operators.create_symlink_operator import LakeFSCreateSymlinkOperator
from lakefs_provider.operators.merge_operator import LakeFSMergeOperator
from lakefs_provider.operators.upload_operator import LakeFSUploadOperator
from lakefs_provider.operators.commit_operator import LakeFSCommitOperator
from lakefs_provider.operators.get_commit_operator import LakeFSGetCommitOperator
from lakefs_provider.operators.get_object_operator import LakeFSGetObjectOperator
from lakefs_provider.sensors.file_sensor import LakeFSFileSensor
from lakefs_provider.sensors.commit_sensor import LakeFSCommitSensor
from airflow.operators.python import PythonOperator
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
"owner": "lakeFS",
"branch": "example-branch",
"repo": "example-repo",
"path": "path/to/_SUCCESS",
"default-branch": "main",
"lakefs_conn_id": "conn_lakefs"
}
CONTENT_PREFIX = 'It is not enough to succeed. Others must fail.'
COMMIT_MESSAGE_1 = 'committing to lakeFS using airflow!'
MERGE_MESSAGE_1 = 'merging to the default branch'
IdAndMessage = namedtuple('IdAndMessage', ['id', 'message'])
def check_expected_prefix(task_instance, actual: str, expected: str) -> None:
if not actual.startswith(expected):
raise AirflowFailException(f'Got:\n"{actual}"\nwhich does not start with\n{expected}')
def check_logs(task_instance, repo: str, ref: str, commits: Sequence[str], messages: Sequence[str],
amount: int = 100) -> None:
hook = LakeFSHook(default_args['lakefs_conn_id'])
expected = [IdAndMessage(commit, message) for commit, message in zip(commits, messages)]
actuals = (IdAndMessage(message=commit['message'], id=commit['id'])
for commit in hook.log_commits(repo, ref, amount))
for (expected, actual) in zip_longest(expected, actuals):
if expected is None:
# Matched all msgs!
return
if expected != actual:
raise AirflowFailException(f'Got {actual} instead of {expected}')
class NamedStringIO(StringIO):
def __init__(self, content: str, name: str) -> None:
super().__init__(content)
self.name = name
@dag(default_args=default_args,
render_template_as_native_obj=True,
max_active_runs=1,
start_date=days_ago(2),
schedule_interval=None,
tags=['testing'])
def lakeFS_workflow():
expected_commits = ['''{{ ti.xcom_pull('merge_branches') }}''',
'''{{ ti.xcom_pull('commit') }}''']
expected_messages = [MERGE_MESSAGE_1, COMMIT_MESSAGE_1]
with TaskGroup(group_id='p1') as p1:
task_create_branch = LakeFSCreateBranchOperator(
task_id='create_branch',
source_branch=default_args.get('default-branch')
)
task_get_branch_commit = LakeFSGetCommitOperator(
do_xcom_push=True,
task_id='get_branch_commit',
ref=default_args['branch'])
task_create_branch >> task_get_branch_commit
with TaskGroup(group_id='p2') as p2:
task_sense_commit = LakeFSCommitSensor(
task_id='sense_commit',
prev_commit_id='''{{ task_instance.xcom_pull(task_ids='get_branch_commit', key='return_value').id }}''',
mode='reschedule',
poke_interval=1,
timeout=10)
task_merge = LakeFSMergeOperator(
task_id='merge_branches',
do_xcom_push=True,
source_ref=default_args.get('branch'),
destination_branch=default_args.get('default-branch'),
msg=MERGE_MESSAGE_1,
metadata={"committer": "airflow-operator"})
task_check_logs_bulk = PythonOperator(
task_id='check_logs_bulk',
python_callable=check_logs,
op_kwargs={
'repo': default_args.get('repo'),
'ref': '''{{ task_instance.xcom_pull(task_ids='merge_branches', key='return_value') }}''',
'commits': expected_commits,
'messages': expected_messages})
task_check_logs_individually = PythonOperator(
task_id='check_logs_individually',
python_callable=check_logs,
op_kwargs={
'repo': default_args.get('repo'),
'ref': '''{{ task_instance.xcom_pull(task_ids='merge_branches', key='return_value') }}''',
'amount': 1,
'commits': expected_commits,
'messages': expected_messages})
task_sense_commit >> task_merge >> [task_check_logs_bulk, task_check_logs_individually]
with TaskGroup(group_id='p3') as p3:
task_sense_file = LakeFSFileSensor(
task_id='sense_file',
mode='reschedule',
poke_interval=1,
timeout=10)
task_get_file = LakeFSGetObjectOperator(
task_id='get_object',
do_xcom_push=True,
ref=default_args['branch'])
task_check_contents = PythonOperator(
task_id='check_expected_prefix',
python_callable=check_expected_prefix,
op_kwargs={
'actual': '''{{ task_instance.xcom_pull(task_ids='get_object', key='return_value') }}''',
'expected': CONTENT_PREFIX})
task_sense_file >> task_get_file >> task_check_contents
with TaskGroup(group_id='p4') as p4:
task_create_file = LakeFSUploadOperator(
task_id='upload_file',
content=NamedStringIO(content=f"{CONTENT_PREFIX} @{time.asctime()}", name='content'))
task_commit = LakeFSCommitOperator(
task_id='commit',
msg=COMMIT_MESSAGE_1,
metadata={"committed_from": "airflow-operator"})
task_create_symlink = LakeFSCreateSymlinkOperator(task_id="create_symlink")
task_create_file >> task_commit >> task_create_symlink
p1 >> [p2, p3, p4]
lakeFS_workflow() This is the expected UI render after introducting But, instead I am seeing the same error message as reported. However, it is working as expected in a local Airflow instance where |
Versions
Airflow: 2.4.3
LakeFS: 0.104.0
airflow-provider-lakefs: 0.46.2
Error Message
Steps to Reproduce
dags
directory.LakeFSCommitOperator
Same error as found in this: #34
Suspect that changes in linked PR should solve it: #35
The text was updated successfully, but these errors were encountered: