Skip to content

HowTo : use AirFlow

Nuwan Waidyanatha edited this page Sep 2, 2023 · 1 revision

Introduction

Directed Acyclic Graph (DAG) files define the scheduled tasks to be performed by Apache Airflow.

Setting up an ETL Process

  • An ETL process requires an airflow mydagfile.py file in the ./airflow_home/dags folder
  • The dag file should be developed to invoke module specific class methods
  • Any small text files can be stored in the wrangler/data folder for each module
  • Unstructured larger files will be stored in either an S3 bucket or NoSQL DB
  • Structured data files will be stored in a relational database such as Postgres

How to write a DAG file

Standard practice

  • Any of the Apps: Mining, Wrangler, and Visuals can have their own set of DAGs.
    • The DAG files will be managed in each App's App/dags folder. For example wrangler/dags/ota/scraper
  • We will have a process that will:
    • copy each App's DAG files to the airflow dag folder, typically in ~/airflow/dags,
    • resembling the same folder structure as maintained in the App/dags folder structure
    • for example, wrangler/dags/ota/scraper/otaAirlineScraper.py will be copied to ~/airflow/dags/ota/scraper/otaAirlineScraper.py
  • Ideally, each DAG file should perform a single module entity and functional package-related tasks.
  • Complicated multiple-module entity and functional package-related scheduled tasks should be broken down to manageable smaller DAGs that are interlinked
  • The DAG files are stored and managed the same way as module entities and functional package folder structure app/dags/entity/function/somefile.dag
  • A DAG should only use class methods defined in a app/module/entity/function/package.py class
  • Before creating a DAG file:
    • use a notebook to emulate the DAG functions,
    • thereafter, transfer the function calls to the DAG file
    • any modifications to the DAG file should be first tested using the associated Notebook

Typical code sections

Non-airflow-related third-party python package imports

    ### import standard python classes
    import os
    import sys
    from datetime import datetime, date, timedelta
    import traceback
    import logging

Our own python packages

### inhouse libraries to initialize, extract, transform, and load
    sys.path.insert(1,"/home/nuwan/workspace/rezaware/")
    import rezaware as reza
    from wrangler.modules.ota.scraper import propertyScrapers as ps #, scraperUtils as otasu
    from utils.modules.etl.load import sparkwls as spark

App vars

    __search_window_days__ = 1   # set the search window length 
    __page_offset__ = 1         # set the page offset for building URLs
    __page_upper_limit__ = 3   # maximum number of pages for building URLs
    __schedule_interval__ = 5   # number of hours between runs 

Define your functions

    def some_function(*args,**kwargs):
        ### your code ###

        ### pass data between functions using xcom ###
        ti = kwargs['ti']
        ti.xcom_push(key='your_push_var_name', value='associated_value_name')

       ### retrieve data from another function with xcom ####
        ti = kwargs['ti']
        _your_pull_var_name=ti.xcom_pull(key='your_push_var_name')

#### Define the tasks to execute the function
### task to execute the some_function ###
some_task_x = PythonOperator(
    task_id='some_task_unique_name',
    python_callable=some_task_unique_name,
)
some_task_x.doc_md = dedent(
    """\
#### give the documentation a heading

describe the rest of the content here; it will appear in the airflow tasks manager
"""
)

"""```

Finally define the DAG

[parallel_task_u, parallel_task_v] >> some_task_x >> final_task_z