- Incorporate Docker in order to solve the sharing and reproducing challenges (different operating systems, versioning, lack of process).
- Use Apache Airflow to orchestrate the pipeline.
- Exercise DAG creation.
- Use Various Airflow operators like
BashOperator
,PythonOpertor
andFileSensor
. - Set up the order operation of each task.
NOTE: there are some redundant tasks within the Airflow dag that could be compressed into fewer tasks. For example, task_1
and task_3
can be coded as one task. However, with the purpose of illustrating the usage of different Airflow operators and the task order operation, I will keep the redundant tasks as the way they are.
In this project, I created a data pipeline to extract online stock market data and deliver future analytical results. Yahoo Finance is used as the data source via the yfinance
python library.
the source data follow this schema:
Columns | Type |
---|---|
Datetime | STRING |
Open | DECIMAL |
High | DECIMAL (highest price within the time interval) |
Low | DECIMAL (lowest price within the time interval) |
Close | DECIMAL (the last price of the time interval) |
Adj Close | DECIMAL |
Volume | DECIMAL |
We'll work with 2 stocks symbols: AAPL
and TSLA
. The workflow can be seen in data_pipeline.py which is scheduled to run at 6pm on every weekday (Mon - Fri) with the below functions:
- Download the daily price data with one minute interval for the two symbols. Each symbol will have a separate task, Task 1 (task_1) and Task 2 (task_2), which run independently and in parallel.
- Sensing files (sensing_task_1 & sensing_task_2) will check the existence of
TSLA_data.csv
andAAPL_data.csv
before executing the next tasks. - Save both datasets into CSV files and load them into a directory. Each symbol will have a separate task, Task 3 (task_3) and Task 4 (task_4), which run independently and in parallel.
- Run your custom query on the downloaded dataset for both symbols, Task 5 (task_5). Before this step executes, all previous tasks must complete.
All the tasks should be successfully executed like this:
We can check all the tasks log in this folder.
Example of task 5's successful execution log file which gives us a list as an output (checking get_last_stock_spread.py
for your reference):
1, From Bash shell, execute ./start.sh
and it will build and start all the services.
2, Wait untill all the services are completely executed and they should be healthy as screenshot below:
3, Go to localhost:8080
to access the web ui.
4, Login with username: airflow
and password: airflow
.
5, Now we can wait for all the tasks running as the schedule or manually trigger the tasks to see the output.
6, After completing all the tasks, we can execute ./stop.sh
to stop the services.
7, Execute ./reset.sh
to wipe out all the images.
log_analyzer.py
is created to monitor all the error messages within the log files by running with below command line as example:
$airflow % python3 log_analyzer.py /Volumes/Moon/SpringBoard/Airflow_MiniProject1/mnt/airflow/logs
It should give us this output
- Integrate CeleryExecutor in the
airflow.cfg
and adjustdocker-compose.yml
configs accordingly. - The execution of the airflow service in the docker should be utilized since it still takes quiet longer time to be successfully implemented.