Skip to content

Commit

Permalink
chore(copernicus): implement the new Copernicus api ETL (#34)
Browse files Browse the repository at this point in the history
* chore(copernicus): implement the new Copernicus api ETL

* remove conda from container; update airflow to latest; clean unused tasks

* update dag & include dag for ARG

* include daily fetch with backfill for each adm 2
  • Loading branch information
luabida authored Nov 26, 2024
1 parent 323c35c commit 64d3ad9
Show file tree
Hide file tree
Showing 10 changed files with 5,862 additions and 777 deletions.
2 changes: 1 addition & 1 deletion alertflow/airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ logging_level = INFO
# Logging level for celery. If not set, it uses the value of logging_level
#
# Supported values: ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``.
celery_logging_level = DEBUG
celery_logging_level = INFO

# Logging level for Flask-appbuilder UI.
#
Expand Down
103 changes: 0 additions & 103 deletions alertflow/dags/episcanner/episcanner_export_data.py

This file was deleted.

93 changes: 93 additions & 0 deletions alertflow/dags/satellite-weather/argentina.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""
Author: Luã Bida Vacaro
Email: [email protected]
Github: https://github.com/luabida
Date: 2023-04-13
The COPERNICUS_ARG Airflow DAG will collect daily weather
data from the Copernicus ERA5 Land Reanalysis dataset for all
cities in Argentina. This data includes temperature, precipitation,
humidity, and atmospheric pressure, which is collected daily
starting from January 1st, 2000 to the present day.
To ensure reliability and safety, the DAG has a 9-day delay
from the current date, as the Copernicus API usually takes
around 7 days to update the dataset.
"""

# import os
# import logging
# import calendar
# from datetime import timedelta, date
#
# import pendulum
# from airflow import DAG
# from airflow.decorators import task
# from airflow.models import Variable
# from sqlalchemy import create_engine, text
#
# from satellite import request, ADM2
#
# env = os.getenv
# email_main = env("EMAIL_MAIN")
#
# DEFAULT_ARGS = {
# "owner": "AlertaDengue",
# "depends_on_past": False,
# # 'email': [email_main],
# "email_on_failure": True,
# "email_on_retry": False,
# "retries": 2,
# "retry_delay": timedelta(minutes=2),
# }
#
#
#
# with DAG(
# dag_id="COPERNICUS_ARG",
# description="ETL of weather data for Brazil",
# tags=["Argentina", "Copernicus"],
# schedule="@monthly",
# default_args=DEFAULT_ARGS,
# start_date=pendulum.datetime(2000, 1, 1),
# end_date=pendulum.datetime(2024, 1, 1),
# catchup=True,
# max_active_runs=14,
# ) as dag:
# DATE = "{{ ds }}" # DAG execution date
# KEY = Variable.get("cdsapi_key", deserialize_json=True)
# URI = Variable.get("psql_main_uri", deserialize_json=True)
#
# @task
# def fetch_ds(dt, uri, api_key):
# locale = "ARG"
# tablename = f"copernicus_{locale.lower()}"
# engine = create_engine(uri)
# dt = date.fromisoformat(dt)
# end_day = calendar.monthrange(dt.year, dt.month)[1]
# date_str = f"{dt.replace(day=1)}/{dt.replace(day=end_day)}"
# with engine.connect() as conn:
# cur = conn.execute(
# text(
# f"SELECT geocode FROM weather.{tablename}"
# f" WHERE date = '{dt}'"
# )
# )
# table_geocodes = set(chain(*cur.fetchall()))
#
# all_geocodes = set([adm.code for adm in ADM2.filter(adm0=locale)])
# geocodes = all_geocodes.difference(table_geocodes)
# print("TABLE_GEO ", f"[{len(table_geocodes)}]: ", table_geocodes)
# print("DIFF_GEO: ", f"[{len(geocodes)}]: ", geocodes)
#
# with request.reanalysis_era5_land(
# date_str.replace("/", "_") + locale,
# api_token=api_key,
# date=date_str,
# locale=locale,
# ) as ds:
# for adm in ADM2.filter(adm0=locale):
# with engine.connect() as conn:
# ds.cope.to_sql(adm, conn, tablename, "weather")
#
# fetch_ds(DATE, URI["PSQL_MAIN_URI"], KEY["CDSAPI_KEY"])
107 changes: 29 additions & 78 deletions alertflow/dags/satellite-weather/brasil.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
"""

import os
from datetime import timedelta
from datetime import date, timedelta
from itertools import chain

import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from satellite import ADM2, request
from sqlalchemy import create_engine, text

env = os.getenv
email_main = env("EMAIL_MAIN")
Expand All @@ -40,98 +44,45 @@
dag_id="COPERNICUS_BRASIL",
description="ETL of weather data for Brazil",
tags=["Brasil", "Copernicus"],
schedule="@daily",
schedule="@monthly",
default_args=DEFAULT_ARGS,
start_date=pendulum.datetime(2024, 1, 1),
start_date=pendulum.datetime(2000, 1, 1),
end_date=pendulum.datetime(2024, 1, 1),
catchup=True,
max_active_runs=14,
):
from airflow.models import Variable

) as dag:
DATE = "{{ ds }}" # DAG execution date
DATA_DIR = "/tmp/copernicus"
KEY = Variable.get("cdsapi_key", deserialize_json=True)
URI = Variable.get("psql_main_uri", deserialize_json=True)

# fmt: off
@task.external_python(
task_id="daily_fetch",
python="/opt/py310/bin/python3.10"
)
# fmt: on
def extract_transform_load(
date: str, data_dir: str, api_key: str, psql_uri: str
) -> str:
"""
Due to incompatibility issues between Airflow's Python version
and the satellite-weather-downloader (SWD) package, this task
will be executed in a dedicated virtual environment, which
includes a pre-installed Python3.10 interpreter within the
container. All imports must be within the scope of the task,
and XCom sharing between tasks is not allowed.
The task is designed to receive the execution date and download
the weather dataset for that specific day. After downloading,
the data is transformed using Xarray and inserted into the Main
Postgres DB, as specified in the .env file, in the form of a
DataFrame containing the weather information.
"""
from datetime import timedelta
from itertools import chain
from pathlib import Path

from dateutil import parser
from satellite import downloader as sat_d
from satellite import weather as sat_w
from satellite.weather.brazil.extract_latlons import MUNICIPALITIES
from sqlalchemy import create_engine, text

start_date = parser.parse(str(date))
max_update_delay = start_date - timedelta(days=6)
@task
def fetch_ds(locale, dt, uri, api_key):
tablename = f"copernicus_{locale.lower()}"
engine = create_engine(uri)
dt = date.fromisoformat(dt) - timedelta(days=5)

with create_engine(psql_uri["PSQL_MAIN_URI"]).connect() as conn:
with engine.connect() as conn:
cur = conn.execute(
text(
"SELECT geocodigo FROM weather.copernicus_brasil"
f" WHERE date = '{str(max_update_delay.date())}'"
f"SELECT geocode FROM weather.{tablename}"
f" WHERE date = '{str(dt)}'"
)
)
table_geocodes = set(chain(*cur.fetchall()))

all_geocodes = set([mun["geocodigo"] for mun in MUNICIPALITIES])
all_geocodes = set([adm.code for adm in ADM2.filter(adm0=locale)])
geocodes = all_geocodes.difference(table_geocodes)
print("TABLE_GEO ", f"[{len(table_geocodes)}]: ", table_geocodes)
print("DIFF_GEO: ", f"[{len(geocodes)}]: ", geocodes)

if not geocodes:
return "There is no geocode to fetch"

# Downloads daily dataset
netcdf_file = sat_d.download_br_netcdf(
date=str(max_update_delay.date()),
data_dir=data_dir,
user_key=api_key["CDSAPI_KEY"],
)

print(f"Handling {netcdf_file}")

# Reads the NetCDF4 file using Xarray
ds = sat_w.load_dataset(netcdf_file)

with create_engine(psql_uri["PSQL_MAIN_URI"]).connect() as conn:
ds.copebr.to_sql(
tablename="copernicus_brasil",
schema="weather",
geocodes=list(geocodes),
con=conn,
)

# Deletes the NetCDF4 file
Path(netcdf_file).unlink(missing_ok=True)

return f"{len(geocodes)} inserted into DB."

# Instantiate the Task
ETL = extract_transform_load(DATE, DATA_DIR, KEY, URI)

ETL # Execute
with request.reanalysis_era5_land(
str(dt).replace("-", "_") + locale,
api_token=api_key,
date=str(dt),
locale=locale,
) as ds:
for adm in ADM2.filter(adm0=locale):
with engine.connect() as conn:
ds.cope.to_sql(adm, conn, tablename, "weather")

fetch_ds("BRA", DATE, URI["PSQL_MAIN_URI"], KEY["CDSAPI_KEY"])
Loading

0 comments on commit 64d3ad9

Please sign in to comment.