-
Notifications
You must be signed in to change notification settings - Fork 0
HowTo : use AirFlow
Nuwan Waidyanatha edited this page Sep 2, 2023
·
1 revision
Directed Acyclic Graph (DAG) files define the scheduled tasks to be performed by Apache Airflow.
- An ETL process requires an airflow
mydagfile.py
file in the./airflow_home/dags
folder - The dag file should be developed to invoke
module
specific class methods - Any small text files can be stored in the
wrangler/data
folder for each module - Unstructured larger files will be stored in either an S3 bucket or NoSQL DB
- Structured data files will be stored in a relational database such as Postgres
- Any of the Apps: Mining, Wrangler, and Visuals can have their own set of DAGs.
- The DAG files will be managed in each App's
App/dags
folder. For examplewrangler/dags/ota/scraper
- The DAG files will be managed in each App's
- We will have a process that will:
- copy each App's DAG files to the airflow dag folder, typically in
~/airflow/dags
, - resembling the same folder structure as maintained in the
App/dags
folder structure - for example,
wrangler/dags/ota/scraper/otaAirlineScraper.py
will be copied to~/airflow/dags/ota/scraper/otaAirlineScraper.py
- copy each App's DAG files to the airflow dag folder, typically in
- Ideally, each DAG file should perform a single module entity and functional package-related tasks.
- Complicated multiple-module entity and functional package-related scheduled tasks should be broken down to manageable smaller DAGs that are interlinked
- The DAG files are stored and managed the same way as module entities and functional package folder structure
app/dags/entity/function/somefile.dag
- A DAG should only use class methods defined in a
app/module/entity/function/package.py
class - Before creating a DAG file:
- use a notebook to emulate the DAG functions,
- thereafter, transfer the function calls to the DAG file
- any modifications to the DAG file should be first tested using the associated Notebook
### import standard python classes
import os
import sys
from datetime import datetime, date, timedelta
import traceback
import logging
### inhouse libraries to initialize, extract, transform, and load
sys.path.insert(1,"/home/nuwan/workspace/rezaware/")
import rezaware as reza
from wrangler.modules.ota.scraper import propertyScrapers as ps #, scraperUtils as otasu
from utils.modules.etl.load import sparkwls as spark
__search_window_days__ = 1 # set the search window length
__page_offset__ = 1 # set the page offset for building URLs
__page_upper_limit__ = 3 # maximum number of pages for building URLs
__schedule_interval__ = 5 # number of hours between runs
def some_function(*args,**kwargs):
### your code ###
### pass data between functions using xcom ###
ti = kwargs['ti']
ti.xcom_push(key='your_push_var_name', value='associated_value_name')
### retrieve data from another function with xcom ####
ti = kwargs['ti']
_your_pull_var_name=ti.xcom_pull(key='your_push_var_name')
#### Define the tasks to execute the function
### task to execute the some_function ###
some_task_x = PythonOperator(
task_id='some_task_unique_name',
python_callable=some_task_unique_name,
)
some_task_x.doc_md = dedent(
"""\
#### give the documentation a heading
describe the rest of the content here; it will appear in the airflow tasks manager
"""
)
"""```
[parallel_task_u, parallel_task_v] >> some_task_x >> final_task_z
Rezaware abstract BI augmented AI/ML entity framework © 2022 by Nuwan Waidyanatha is licensed under Creative Commons Attribution 4.0 International