From 1582d781dfa60101d445b2e881b3ba69373ee358 Mon Sep 17 00:00:00 2001 From: ErnestaP Date: Thu, 15 Aug 2024 17:19:45 +0200 Subject: [PATCH] Inspire Matomo: visits per day DAG --- README.md | 2 +- .../models/inspire_matomo/inspire_matomo.py | 15 +++ .../inspire_visits_per_day_dag.py | 96 +++++++++++++++++++ dags/inspire_matomo/utils.py | 22 +++++ 4 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 dags/common/models/inspire_matomo/inspire_matomo.py create mode 100644 dags/inspire_matomo/inspire_visits_per_day_dag.py create mode 100644 dags/inspire_matomo/utils.py diff --git a/README.md b/README.md index 4f6eec5..afa2dcd 100644 --- a/README.md +++ b/README.md @@ -70,7 +70,7 @@ airflow standalone ### 6. Start Postgres with Docker Compose If you're using Docker to manage your Postgres database, start the service. -IMPORTANT: Please add CDS_TOKEN value in Docker compose file +IMPORTANT: Please add CDS_TOKEN and MATOMO_AUTH_TOKEN value in Docker compose file ```sh docker-compose -f docker-compose.standalone.yaml up diff --git a/dags/common/models/inspire_matomo/inspire_matomo.py b/dags/common/models/inspire_matomo/inspire_matomo.py new file mode 100644 index 0000000..e23162f --- /dev/null +++ b/dags/common/models/inspire_matomo/inspire_matomo.py @@ -0,0 +1,15 @@ +from sqlalchemy import Column, Date, DateTime, Integer, func +from sqlalchemy.ext.declarative import declarative_base + +Base = declarative_base() + + +class MatomoData(Base): + __tablename__ = "inspire_matomo_data" + + id = Column(Integer, primary_key=True) + date = Column(Date) + visits = Column(Integer) + unique_visitors = Column(Integer) + created_at = Column(DateTime, default=func.now()) + updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) diff --git a/dags/inspire_matomo/inspire_visits_per_day_dag.py b/dags/inspire_matomo/inspire_visits_per_day_dag.py new file mode 100644 index 0000000..dee016b --- /dev/null +++ b/dags/inspire_matomo/inspire_visits_per_day_dag.py @@ -0,0 +1,96 @@ +import json +import os +from datetime import datetime + +import pendulum +from airflow.decorators import dag, task +from airflow.exceptions import AirflowException +from airflow.providers.http.hooks.http import HttpHook +from common.models.inspire_matomo.inspire_matomo import MatomoData +from common.operators.sqlalchemy_operator import sqlalchemy_task +from executor_config import kubernetes_executor_config +from inspire_matomo.utils import get_parameters +from tenacity import retry_if_exception_type, stop_after_attempt + +now = datetime.now() +formatted_date = now.strftime("%Y-%m-%d") + + +@dag( + start_date=pendulum.today("UTC").add(days=-1), + schedule="@monthly", + params={"period": "day", "date": formatted_date}, +) +def inspire_visits_per_day_dag(): + @task(executor_config=kubernetes_executor_config) + def fetch_visits_per_day(**kwargs): + http_hook = HttpHook(http_conn_id="matomo", method="GET") + period = kwargs["params"].get("period") + date = kwargs["params"].get("date") + parameters = get_parameters( + period=period, date=date, endpoint_key="visits_per_day" + ) + token = os.environ.get("MATOMO_AUTH_TOKEN") + response = http_hook.run_with_advanced_retry( + endpoint="/index.php", + data=parameters, + headers={"Authorization": f"Bearer {token}"}, + _retry_args={ + "stop": stop_after_attempt(3), + "retry": retry_if_exception_type(AirflowException), + }, + ) + return response.text + + @task(executor_config=kubernetes_executor_config) + def fetch_unique_visitors_per_day(**kwargs): + http_hook = HttpHook(http_conn_id="matomo", method="GET") + period = kwargs["params"].get("period") + date = kwargs["params"].get("date") + parameters = get_parameters( + period=period, date=date, endpoint_key="unique_visitors" + ) + token = os.environ.get("MATOMO_AUTH_TOKEN") + response = http_hook.run_with_advanced_retry( + endpoint="/index.php", + data=parameters, + headers={"Authorization": f"Bearer {token}"}, + _retry_args={ + "stop": stop_after_attempt(3), + "retry": retry_if_exception_type(AirflowException), + }, + ) + return response.text + + @sqlalchemy_task(conn_id="superset") + def populate_database(visits_per_day, unique_visitors_per_day, session, **kwargs): + visits_per_day_json = json.loads(visits_per_day) + unique_visitors_per_day_json = json.loads(unique_visitors_per_day) + date = kwargs["params"].get("date") + + record = ( + session.query(MatomoData) + .filter_by(date=visits_per_day_json.get("date")) + .first() + ) + if record: + record.visits = int(visits_per_day_json.visits) + record.unique_visitors = int(unique_visitors_per_day_json.unique_visitors) + else: + parsed_date = datetime.strptime(date, "%Y-%m-%d").date() + new_record = MatomoData( + visits=int(visits_per_day_json.get("value")), + unique_visitors=int(unique_visitors_per_day_json.get("value")), + date=parsed_date, + ) + session.add(new_record) + + visits_per_day = fetch_visits_per_day() + unique_visitors_per_day = fetch_unique_visitors_per_day() + populate_database( + visits_per_day=visits_per_day, + unique_visitors_per_day=unique_visitors_per_day, + ) + + +inspire_visits_per_day = inspire_visits_per_day_dag() diff --git a/dags/inspire_matomo/utils.py b/dags/inspire_matomo/utils.py new file mode 100644 index 0000000..40eca3c --- /dev/null +++ b/dags/inspire_matomo/utils.py @@ -0,0 +1,22 @@ +import os + + +def get_endpoint(key): + end_point_map = { + "visits_per_day": "VisitsSummary.getVisits", + "unique_visitors": "VisitsSummary.getUniqueVisitors", + } + return end_point_map[key] + + +def get_parameters(period, date, endpoint_key): + return { + "module": "API", + "token_auth": os.environ.get("MATOMO_AUTH_TOKEN"), + "idSite": os.environ.get("MATOMO_SITE_ID"), + "date": str(date), + "period": period, + "format": "json", + "module": "API", + "method": get_endpoint(endpoint_key), + }